In [1]:
import tensorflow as tf
import numpy as np
import os

print(tf.__version__)

2.6.0


In [5]:
fashion_mnist = tf.keras.datasets.fashion_mnist
(train_images, train_labels), (test_images, test_labels) = fashion_mnist.load_data()

train_images = train_images[..., None]
test_images = test_images[..., None]

train_images = train_images / np.float32(255)
test_images = test_images / np.float32(255)

tf.distributed.MirroredStrategy

- 모든 변수와 모델 그래프는 장치에 복제됨
- 입력이 장치에 고르게 분배
- 각 장치는 주어진 입력에 대해 loss, gradient 계산
- gradient를 모두 더함으로써 모든 장치들 간에 gradient 동기화
- 동기화된 후, 동일한 업데이트가 각 장치에 있는 변수의 복사본에 동일하게 적용

In [7]:
strategy = tf.distribute.MirroredStrategy()

INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:CPU:0',)


In [8]:
print('Number of devices: {}'.format(strategy.num_replicas_in_sync))

Number of devices: 1


입력 파이프라인 설정

In [9]:
BUFFER_SIZE = len(train_images)
BATCH_SIZE_PER_REPLICA= 64
GLOBAL_BATCH_SIZE = BATCH_SIZE_PER_REPLICA * strategy.num_replicas_in_sync
EPOCHS = 10

In [10]:
train_dataset = tf.data.Dataset.from_tensor_slices((train_images, train_labels)).shuffle(BUFFER_SIZE).batch(GLOBAL_BATCH_SIZE) 
test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_labels)).batch(GLOBAL_BATCH_SIZE) 

train_dist_dataset = strategy.experimental_distribute_dataset(train_dataset)
test_dist_dataset = strategy.experimental_distribute_dataset(test_dataset)

모델 만들기

In [11]:
def create_model():
  model = tf.keras.Sequential([
                               tf.keras.layers.Conv2D(32, 3, activation='relu'),
                               tf.keras.layers.MaxPooling2D(),
                               tf.keras.layers.Conv2D(64, 3, activation='relu'),
                               tf.keras.layers.MaxPooling2D(),
                               tf.keras.layers.Flatten(),
                               tf.keras.layers.Dense(64, activation='relu'),
                               tf.keras.layers.Dense(10, activation = 'softmax')
  ])

  return model

In [12]:
checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt")

손실함수 정의

1 GPU/CPU가 있는 단일 시스템에서 손실 : 입력 배치의 예제 수로 나뉨

4개 GPU, batch_size = 64일 때 : 하나의 입력 배치가 전체 복제본(4개의 GPU)에 걸쳐 분배되며 각 복제본은 크기 16의 입력을 받음
각 복제본의 모델은 해당 입력으로 순방향 전달을 수행하고 손실을 계산. 손실은 global_batch_size(64)로 나누어야 함

-> 각 복제본에서 gradient가 계산된 후 이를 합산하여 전체 복제본에 걸쳐 동기화가 됨


합산할 때는 tf.nn.scale_regularization_loss를 사용

In [21]:
with strategy.scope():
  loss_object = tf.keras.losses.SparseCategoricalCrossentropy(
      from_logits=True,
      reduction=tf.keras.losses.Reduction.NONE)
def compute_loss(labels, predictions):
  per_example_loss = loss_object(labels, predictions)
  return tf.nn.compute_average_loss(per_example_loss, global_batch_size = GLOBAL_BATCH_SIZE)

In [22]:
# 손실과 정확도를 기록하기 위한 지표 정의

with strategy.scope():
  test_loss = tf.keras.metrics.Mean(name = 'test_loss')

  train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name = 'train_accuracy')
  test_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name = 'test_accuracy')

In [23]:
# 훈련 루프

with strategy.scope():
  model = create_model()
  optimizer = tf.keras.optimizers.Adam()

  checkpoint = tf.train.Checkpoint(optimizer = optimizer, model = model)

In [24]:
def train_step(inputs):
  images, labels = inputs

  with tf.GradientTape() as tape:
    predictions = model(images, training = True)
    loss = compute_loss(labels, predictions)

  gradients = tape.gradient(loss, model.trainable_variables)
  optimizer.apply_gradients(zip(gradients, model.trainable_variables))

  train_accuracy.update_state(labels, predictions)
  return loss

def test_step(inputs):
  images, labels = inputs

  predictions = model(images, training = False)
  t_loss = loss_object(labels, predictions)

  test_loss.update_state(t_loss)
  test_accuracy.update_state(labels, predictions)

In [26]:
@tf.function
def distributed_train_step(dataset_inputs):
  per_replica_losses = strategy.run(train_step, args=(dataset_inputs,))
  return strategy.reduce(tf.distribute.ReduceOp.SUM, per_replica_losses, axis = None)

@tf.function
def distributed_test_step(dataset_inputs):
  return strategy.run(test_step, args=(dataset_inputs,))

for epoch in range(EPOCHS):
  total_loss = 0.0
  num_batches = 0

  for x in train_dist_dataset:
    total_loss += distributed_train_step(x)
    num_batches += 1
  train_loss = total_loss / num_batches

  for x in test_dist_dataset:
    distributed_test_step(x)

    if epoch % 2 == 0:
      checkpoint.save(checkpoint_prefix)
    template = ("Epoch {}, Loss: {}, Accuracy: {}, Test Loss: {}, "
              "Test Accuracy: {}")
  print (template.format(epoch+1, train_loss,
                         train_accuracy.result()*100, test_loss.result(),
                         test_accuracy.result()*100))

  test_loss.reset_states()
  train_accuracy.reset_states()
  test_accuracy.reset_states()


  '"`sparse_categorical_crossentropy` received `from_logits=True`, but '


Epoch 1, Loss: 0.3339049220085144, Accuracy: 84.73500061035156, Test Loss: 0.3538040518760681, Test Accuracy: 87.08267211914062
Epoch 2, Loss: 0.28882819414138794, Accuracy: 89.43167114257812, Test Loss: 0.3005160987377167, Test Accuracy: 89.27999877929688
Epoch 3, Loss: 0.25879883766174316, Accuracy: 90.46333312988281, Test Loss: 0.28906407952308655, Test Accuracy: 89.6300048828125
Epoch 4, Loss: 0.23640312254428864, Accuracy: 91.3183364868164, Test Loss: 0.27847766876220703, Test Accuracy: 89.91000366210938
Epoch 5, Loss: 0.21524685621261597, Accuracy: 92.04166412353516, Test Loss: 0.2885028123855591, Test Accuracy: 89.3800048828125
Epoch 6, Loss: 0.19979551434516907, Accuracy: 92.59832763671875, Test Loss: 0.26059702038764954, Test Accuracy: 90.61000061035156
Epoch 7, Loss: 0.1840706169605255, Accuracy: 93.17832946777344, Test Loss: 0.2663477957248688, Test Accuracy: 90.62000274658203
Epoch 8, Loss: 0.1705845296382904, Accuracy: 93.61499786376953, Test Loss: 0.26093336939811707, Tes

최신 체크포인트를 불러와 테스트하기

In [28]:
eval_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name = 'eval_accuracy')

new_model = create_model()
new_optimizer = tf.keras.optimizers.Adam()

test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_labels)).batch(GLOBAL_BATCH_SIZE)

In [30]:
@tf.function
def eval_step(images, labels):
  predictions = new_model(images,training= False)
  eval_accuracy(labels, predictions)

In [31]:
checkpoint = tf.train.Checkpoint(optimizer = new_optimizer, model = new_model)
checkpoint.restore(tf.train.latest_checkpoint(checkpoint_dir))

for images, labels in test_dataset:
  eval_step(images, labels)

print('전략을 사용하지 않고, 저장된 모델을 복원한 후의 정확도 : {}'.format(eval_accuracy.result()*100))

전략을 사용하지 않고, 저장된 모델을 복원한 후의 정확도 : 90.93000030517578


데이터셋에 대해 반복작업을 하는 다른 방법들

In [32]:
# iterator 사용

for _ in range(EPOCHS):
  total_loss = 0.0
  num_batches = 0
  train_iter = iter(train_dist_dataset)

  for _ in range(10):
    total_loss += distributed_train_step(next(train_iter))
    num_batches += 1
  average_train_loss = total_loss / num_batches

  template = ("Epoch {}, Loss : {}, Accuracy : {}")
  print(template.format(epoch+1, average_train_loss, train_accuracy.result()*100))
  train_accuracy.reset_states()

Epoch 10, Loss : 0.15513095259666443, Accuracy : 94.53125
Epoch 10, Loss : 0.1335621476173401, Accuracy : 96.09375
Epoch 10, Loss : 0.11737672239542007, Accuracy : 95.625
Epoch 10, Loss : 0.12078237533569336, Accuracy : 96.25
Epoch 10, Loss : 0.11946958303451538, Accuracy : 96.25
Epoch 10, Loss : 0.13709188997745514, Accuracy : 94.53125
Epoch 10, Loss : 0.13406801223754883, Accuracy : 95.3125
Epoch 10, Loss : 0.1214999109506607, Accuracy : 95.78125
Epoch 10, Loss : 0.1460639387369156, Accuracy : 93.90625
Epoch 10, Loss : 0.12815073132514954, Accuracy : 95.15625


In [36]:
# tf.function 내부에서 반복하기

@tf.function
def distributed_train_epoch(dataset):
  total_loss = 0.0
  num_batches = 0
  for x in dataset:
    per_replica_losses = strategy.run(train_step, args = (x,))
    total_loss += strategy.reduce(tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None)
  num_batches += 1
  return total_loss / tf.cast(num_batches, dtype = tf.float32)

for epoch in range(EPOCHS):
  train_loss = distributed_train_epoch(train_dist_dataset)

  template = ("Epoch {}, Loss: {}, Accuracy: {}")
  print (template.format(epoch+1, train_loss, train_accuracy.result()*100))

  train_accuracy.reset_states()

  '"`sparse_categorical_crossentropy` received `from_logits=True`, but '


Epoch 1, Loss: 125.27386474609375, Accuracy: 95.00333404541016
Epoch 2, Loss: 116.18745422363281, Accuracy: 95.34833526611328
Epoch 3, Loss: 104.72651672363281, Accuracy: 95.82333374023438
Epoch 4, Loss: 97.30291748046875, Accuracy: 96.04833221435547
Epoch 5, Loss: 92.0727767944336, Accuracy: 96.27833557128906
Epoch 6, Loss: 82.01773834228516, Accuracy: 96.74166107177734
Epoch 7, Loss: 76.8344955444336, Accuracy: 96.94499969482422
Epoch 8, Loss: 71.19281768798828, Accuracy: 97.2066650390625
Epoch 9, Loss: 65.19236755371094, Accuracy: 97.413330078125
Epoch 10, Loss: 60.38823318481445, Accuracy: 97.61499786376953
