<a href="https://colab.research.google.com/github/talkin24/jaxflax_lab/blob/main/Parallel_training.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Ensembling on multiple devices

앙상블의 크기가 사용 가능한 디바이스 수와 동일한 MNIST 데이터 세트에서 CNN의 앙상블을 훈련하는 방법을 보여드리겠습니다. 간단히 설명하면 다음과 같습니다:

- `jax.pmap()`을 사용하여 여러 함수를 병렬로 만듭니다,

- 랜덤 시드를 분할하여 다른 매개변수 초기화를 얻습니다,

- 입력을 복제하고 필요한 경우 출력을 복제 해제합니다,

- 예측을 계산하기 위해 여러 기기에서 평균 확률을 계산합니다.

이 하우투에서는 임포트, CNN 모듈, 메트릭 계산과 같은 일부 코드를 생략했지만 이러한 코드는 MNIST 예제에서 찾을 수 있습니다.

## Parallel functions

먼저 모델의 초기 파라미터를 검색하는 `create_train_state()`의 병렬 버전을 생성합니다. 이 작업은 `jax.pmap()`을 사용하여 수행합니다. 함수를 "pmap"하는 효과는 함수를 XLA로 컴파일하지만(jax.jit()와 유사), XLA 디바이스(예: GPU/TPU)에서 병렬로 실행하는 것입니다.

In [None]:
# Single-model
def create_train_state(rng, learning_rate, momentum):
  cnn = CNN()
  params = cnn.init(rng, jnp.ones([1, 28, 28, 1]))['params']
  tx = optax.sgd(learning_rate, momentum)
  return train_state.TrainState.create(
      apply_fn=cnn.apply, params=params, tx=tx)
  

# Ensemble
@functools.partial(jax.pmap, static_broadcasted_argnums=(1, 2)) #####
def create_train_state(rng, learning_rate, momentum):
  cnn = CNN()
  params = cnn.init(rng, jnp.ones([1, 28, 28, 1]))['params']
  tx = optax.sgd(learning_rate, momentum)
  return train_state.TrainState.create(
      apply_fn=cnn.apply, params=params, tx=tx)

위의 단일 모델 코드의 경우 `jax.jit()`를 사용하여 모델을 느리게 초기화합니다. 

앙상블의 경우 `jax.pmap()`은 기본적으로 제공된 인수 `rng`의 첫 번째 축에 매핑되므로 나중에 이 함수를 호출할 때 각 디바이스에 대해 다른 값을 제공해야 합니다.

또한 `learning_rate`와 `momentum`을 정적 인자로 지정하여 추상적인 모양이 아닌 이 인자의 구체적인 값을 사용하도록 지정한 점에 유의하세요. 이는 제공된 인수가 스칼라 값이기 때문에 필요합니다. 

다음으로 `apply_model()` 및 `update_model()` 함수에 대해서도 동일한 작업을 수행합니다. 

앙상블에서 예측을 계산하기 위해 개별 확률의 평균을 취합니다. `jax.lax.pmean()`을 사용하여 여러 기기에서 평균을 계산합니다. 이를 위해서는 `jax.pmap()` 및 `jax.lax.pmean()` 모두에 axis_name을 지정해야 합니다.

In [None]:
#Single-model
@jax.jit #####
def apply_model(state, images, labels):
  def loss_fn(params):
    logits = CNN().apply({'params': params}, images)
    one_hot = jax.nn.one_hot(labels, 10)
    loss = optax.softmax_cross_entropy(logits=logits, labels=one_hot).mean()
    return loss, logits

  grad_fn = jax.value_and_grad(loss_fn, has_aux=True)
  (loss, logits), grads = grad_fn(state.params)

  accuracy = jnp.mean(jnp.argmax(logits, -1) == labels) #####
  return grads, loss, accuracy

@jax.jit #####
def update_model(state, grads):
  return state.apply_gradients(grads=grads)


# Ensemble
@functools.partial(jax.pmap, axis_name='ensemble') #####
def apply_model(state, images, labels):
  def loss_fn(params):
    logits = CNN().apply({'params': params}, images)
    one_hot = jax.nn.one_hot(labels, 10)
    loss = optax.softmax_cross_entropy(logits=logits, labels=one_hot).mean()
    return loss, logits

  grad_fn = jax.value_and_grad(loss_fn, has_aux=True)
  (loss, logits), grads = grad_fn(state.params)
  probs = jax.lax.pmean(jax.nn.softmax(logits), axis_name='ensemble') #####
  accuracy = jnp.mean(jnp.argmax(probs, -1) == labels) #####
  return grads, loss, accuracy

@jax.pmap #####
def update_model(state, grads):
  return state.apply_gradients(grads=grads)

# Training the Ensemble

다음으로 `train_epoch()` 함수를 변환합니다.

위에서 pmapped 함수를 호출할 때 필요한 경우 모든 디바이스에 대한 인수를 복제하고 반환값의 중복을 제거하는 작업을 주로 처리해야 합니다.

In [None]:
# Single-model
def train_epoch(state, train_ds, batch_size, rng):
  train_ds_size = len(train_ds['image'])
  steps_per_epoch = train_ds_size // batch_size

  perms = jax.random.permutation(rng, len(train_ds['image']))
  perms = perms[:steps_per_epoch * batch_size]
  perms = perms.reshape((steps_per_epoch, batch_size))

  epoch_loss = []
  epoch_accuracy = []

  for perm in perms:
    batch_images = train_ds['image'][perm, ...] #####
    batch_labels = train_ds['label'][perm, ...] #####
    grads, loss, accuracy = apply_model(state, batch_images, batch_labels)
    state = update_model(state, grads)
    epoch_loss.append(loss) #####
    epoch_accuracy.append(accuracy) #####
  train_loss = np.mean(epoch_loss)
  train_accuracy = np.mean(epoch_accuracy)
  return state, train_loss, train_accuracy


# Ensemble
def train_epoch(state, train_ds, batch_size, rng):
  train_ds_size = len(train_ds['image'])
  steps_per_epoch = train_ds_size // batch_size

  perms = jax.random.permutation(rng, len(train_ds['image']))
  perms = perms[:steps_per_epoch * batch_size]
  perms = perms.reshape((steps_per_epoch, batch_size))

  epoch_loss = []
  epoch_accuracy = []

  for perm in perms:
    batch_images = jax_utils.replicate(train_ds['image'][perm, ...]) #####
    batch_labels = jax_utils.replicate(train_ds['label'][perm, ...]) #####
    grads, loss, accuracy = apply_model(state, batch_images, batch_labels)
    state = update_model(state, grads)
    epoch_loss.append(jax_utils.unreplicate(loss)) #####
    epoch_accuracy.append(jax_utils.unreplicate(accuracy)) #####
  train_loss = np.mean(epoch_loss)
  train_accuracy = np.mean(epoch_accuracy)
  return state, train_loss, train_accuracy

보시다시피 `state`와 관련된 로직을 변경할 필요가 없습니다. 

아래 훈련 코드에서 볼 수 있듯이 train state는 이미 복제되어 있으므로 `train_step()`에 전달하면 `train_step()`이 pmap되어 있기 때문에 정상적으로 작동하기 때문입니다. 그러나 훈련 데이터 세트는 아직 복제되지 않았으므로 여기서 리플리케이션을 수행합니다. 전체 train 데이터 집합을 복제하는 것은 너무 많은 메모리를 사용하므로 배치 수준에서 수행합니다.

이제 실제 훈련 로직을 다시 작성할 수 있습니다. 이는 두 가지 간단한 변경 사항으로 구성됩니다. RNG를 `create_train_state()`에 전달할 때 복제되는지 확인하는 것과 전체 데이터 세트에 대해 직접 이 작업을 수행할 수 있도록 훈련 데이터 세트보다 훨씬 작은 테스트 데이터 세트를 리플리케이트하는 것입니다.

In [None]:
# Single-model
train_ds, test_ds = get_datasets()

rng = jax.random.PRNGKey(0)

rng, init_rng = jax.random.split(rng)
state = create_train_state(init_rng, learning_rate, momentum) #####


for epoch in range(1, num_epochs + 1):
  rng, input_rng = jax.random.split(rng)
  state, train_loss, train_accuracy = train_epoch(
      state, train_ds, batch_size, input_rng)

  _, test_loss, test_accuracy = apply_model( #####
      state, test_ds['image'], test_ds['label']) #####

  logging.info(
      'epoch:% 3d, train_loss: %.4f, train_accuracy: %.2f, '
      'test_loss: %.4f, test_accuracy: %.2f'
      % (epoch, train_loss, train_accuracy * 100, test_loss,
         test_accuracy * 100))


# Ensemble
train_ds, test_ds = get_datasets()
test_ds = jax_utils.replicate(test_ds) #####
rng = jax.random.PRNGKey(0)

rng, init_rng = jax.random.split(rng)
state = create_train_state(jax.random.split(init_rng, jax.device_count()), #####
                           learning_rate, momentum) #####

for epoch in range(1, num_epochs + 1):
  rng, input_rng = jax.random.split(rng)
  state, train_loss, train_accuracy = train_epoch(
      state, train_ds, batch_size, input_rng)

  _, test_loss, test_accuracy = jax_utils.unreplicate( #####
      apply_model(state, test_ds['image'], test_ds['label'])) #####

  logging.info(
      'epoch:% 3d, train_loss: %.4f, train_accuracy: %.2f, '
      'test_loss: %.4f, test_accuracy: %.2f'
      % (epoch, train_loss, train_accuracy * 100, test_loss,
         test_accuracy * 100))