In [None]:
# 파이썬 ≥3.5 필수
import sys
assert sys.version_info >= (3, 5)

# 사이킷런 ≥0.20 필수
import sklearn
assert sklearn.__version__ >= "0.20"

try:
    # %tensorflow_version은 코랩에만 있습니다.
    %tensorflow_version 2.x
    !echo "deb http://storage.googleapis.com/tensorflow-serving-apt stable tensorflow-model-server tensorflow-model-server-universal" > /etc/apt/sources.list.d/tensorflow-serving.list
    !curl https://storage.googleapis.com/tensorflow-serving-apt/tensorflow-serving.release.pub.gpg | apt-key add -
    !apt update && apt-get install -y tensorflow-model-server
    %pip install -q -U tensorflow-serving-api
    IS_COLAB = True
except Exception:
    IS_COLAB = False

# 텐서플로 ≥2.0 필수
import tensorflow as tf
from tensorflow import keras
assert tf.__version__ >= "2.0"

if not tf.config.list_physical_devices('GPU'):
    print("감지된 GPU가 없습니다. GPU가 없으면 LSTM과 CNN이 매우 느릴 수 있습니다.")
    if IS_COLAB:
        print("런타임 > 런타임 유형 변경 메뉴를 선택하고 하드웨어 가속기로 GPU를 고르세요.")

# 공통 모듈 임포트
import numpy as np
import os

# 노트북 실행 결과를 동일하게 유지하기 위해
np.random.seed(42)
tf.random.set_seed(42)

# 깔끔한 그래프 출력을 위해
%matplotlib inline
import matplotlib as mpl
import matplotlib.pyplot as plt
mpl.rc('axes', labelsize=14)
mpl.rc('xtick', labelsize=12)
mpl.rc('ytick', labelsize=12)

# 그림을 저장할 위치
PROJECT_ROOT_DIR = "."
CHAPTER_ID = "deploy"
IMAGES_PATH = os.path.join(PROJECT_ROOT_DIR, "images", CHAPTER_ID)
os.makedirs(IMAGES_PATH, exist_ok=True)

def save_fig(fig_id, tight_layout=True, fig_extension="png", resolution=300):
    path = os.path.join(IMAGES_PATH, fig_id + "." + fig_extension)
    print("그림 저장", fig_id)
    if tight_layout:
        plt.tight_layout()
    plt.savefig(path, format=fig_extension, dpi=resolution)

No GPU was detected. CNNs can be very slow without a GPU.


# 텐서플로 서빙(TFS)으로 텐서플로 모델 배포하기

REST API나 gRPC API를 사용합니다.

## `SavedModel` 저장과 로딩

In [None]:
(X_train_full, y_train_full), (X_test, y_test) = keras.datasets.mnist.load_data()
X_train_full = X_train_full[..., np.newaxis].astype(np.float32) / 255.
X_test = X_test[..., np.newaxis].astype(np.float32) / 255.
X_valid, X_train = X_train_full[:5000], X_train_full[5000:]
y_valid, y_train = y_train_full[:5000], y_train_full[5000:]
X_new = X_test[:3]

In [None]:
np.random.seed(42)
tf.random.set_seed(42)

model = keras.models.Sequential([
    keras.layers.Flatten(input_shape=[28, 28, 1]),
    keras.layers.Dense(100, activation="relu"),
    keras.layers.Dense(10, activation="softmax")
])
model.compile(loss="sparse_categorical_crossentropy",
              optimizer=keras.optimizers.SGD(learning_rate=1e-2),
              metrics=["accuracy"])
model.fit(X_train, y_train, epochs=10, validation_data=(X_valid, y_valid))

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


<tensorflow.python.keras.callbacks.History at 0x7fe3b8718590>

In [None]:
np.round(model.predict(X_new), 2)

array([[0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 1.  , 0.  , 0.  ],
       [0.  , 0.  , 0.99, 0.01, 0.  , 0.  , 0.  , 0.  , 0.  , 0.  ],
       [0.  , 0.97, 0.01, 0.  , 0.  , 0.  , 0.  , 0.01, 0.  , 0.  ]],
      dtype=float32)

In [None]:
model_version = "0001"
model_name = "my_mnist_model"
model_path = os.path.join(model_name, model_version)
model_path

'my_mnist_model/0001'

In [None]:
!rm -rf {model_name}

In [None]:
tf.saved_model.save(model, model_path)

INFO:tensorflow:Assets written to: my_mnist_model/0001/assets


In [None]:
for root, dirs, files in os.walk(model_name):
    indent = '    ' * root.count(os.sep)
    print('{}{}/'.format(indent, os.path.basename(root)))
    for filename in files:
        print('{}{}'.format(indent + '    ', filename))

my_mnist_model/
    0001/
        saved_model.pb
        variables/
            variables.data-00000-of-00001
            variables.index
        assets/


In [None]:
!saved_model_cli show --dir {model_path}

The given SavedModel contains the following tag-sets:
'serve'


In [None]:
!saved_model_cli show --dir {model_path} --tag_set serve

The given SavedModel MetaGraphDef contains SignatureDefs with the following keys:
SignatureDef key: "__saved_model_init_op"
SignatureDef key: "serving_default"


In [None]:
!saved_model_cli show --dir {model_path} --tag_set serve \
                      --signature_def serving_default

The given SavedModel SignatureDef contains the following input(s):
  inputs['flatten_input'] tensor_info:
      dtype: DT_FLOAT
      shape: (-1, 28, 28, 1)
      name: serving_default_flatten_input:0
The given SavedModel SignatureDef contains the following output(s):
  outputs['dense_1'] tensor_info:
      dtype: DT_FLOAT
      shape: (-1, 10)
      name: StatefulPartitionedCall:0
Method name is: tensorflow/serving/predict


In [None]:
!saved_model_cli show --dir {model_path} --all


MetaGraphDef with tag-set: 'serve' contains the following SignatureDefs:

signature_def['__saved_model_init_op']:
  The given SavedModel SignatureDef contains the following input(s):
  The given SavedModel SignatureDef contains the following output(s):
    outputs['__saved_model_init_op'] tensor_info:
        dtype: DT_INVALID
        shape: unknown_rank
        name: NoOp
  Method name is: 

signature_def['serving_default']:
  The given SavedModel SignatureDef contains the following input(s):
    inputs['flatten_input'] tensor_info:
        dtype: DT_FLOAT
        shape: (-1, 28, 28, 1)
        name: serving_default_flatten_input:0
  The given SavedModel SignatureDef contains the following output(s):
    outputs['dense_1'] tensor_info:
        dtype: DT_FLOAT
        shape: (-1, 10)
        name: StatefulPartitionedCall:0
  Method name is: tensorflow/serving/predict

Defined Functions:
  Function Name: '__call__'
    Option #1
      Callable with:
        Argument #1
          flatte

X_new를 `npy` 파일로 만들면 모델에 쉽게 전달할 수 있습니다:

In [None]:
np.save("my_mnist_tests.npy", X_new)

In [None]:
input_name = model.input_names[0]
input_name

'flatten_input'

그리고 이제 `saved_model_cli`를 사용해 방금 저장한 샘플에 대한 예측을 만듭니다:

In [None]:
!saved_model_cli run --dir {model_path} --tag_set serve \
                     --signature_def serving_default    \
                     --inputs {input_name}=my_mnist_tests.npy

2021-02-18 22:15:30.294109: I tensorflow/compiler/jit/xla_cpu_device.cc:41] Not creating XLA devices, tf_xla_enable_xla_devices not set
2021-02-18 22:15:30.294306: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
Instructions for updating:
This function will only be available through the v1 compatibility library as tf.compat.v1.saved_model.loader.load or tf.compat.v1.saved_model.load. There will be a new function for importing SavedModels in Tensorflow 2.0.
INFO:tensorflow:Restoring parameters from my_mnist_model/0001/variables/variables
2021-02-18 22:15:30.323498: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:196] None of the MLIR optimization passes are enabled (registered 0 passes)
Result for output key dense_

In [None]:
np.round([[1.1347984e-04, 1.5187356e-07, 9.7032893e-04, 2.7640699e-03, 3.7826971e-06,
           7.6876910e-05, 3.9140293e-08, 9.9559116e-01, 5.3502394e-05, 4.2665208e-04],
          [8.2443521e-04, 3.5493889e-05, 9.8826385e-01, 7.0466995e-03, 1.2957400e-07,
           2.3389691e-04, 2.5639210e-03, 9.5886099e-10, 1.0314899e-03, 8.7952529e-08],
          [4.4693781e-05, 9.7028232e-01, 9.0526715e-03, 2.2641101e-03, 4.8766597e-04,
           2.8800720e-03, 2.2714981e-03, 8.3753867e-03, 4.0439744e-03, 2.9759688e-04]], 2)

array([[0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 1.  , 0.  , 0.  ],
       [0.  , 0.  , 0.99, 0.01, 0.  , 0.  , 0.  , 0.  , 0.  , 0.  ],
       [0.  , 0.97, 0.01, 0.  , 0.  , 0.  , 0.  , 0.01, 0.  , 0.  ]])

## 텐서플로 서빙

[도커](https://docs.docker.com/install/)가 없다면 설치하세요. 그리고 다음을 실행하세요:

```bash
docker pull tensorflow/serving

export ML_PATH=$HOME/ml # or wherever this project is
docker run -it --rm -p 8500:8500 -p 8501:8501 \
   -v "$ML_PATH/my_mnist_model:/models/my_mnist_model" \
   -e MODEL_NAME=my_mnist_model \
   tensorflow/serving
```

사용이 끝나면 Ctrl-C를 눌러 서버를 종료하세요.

또는 `tensorflow_model_server`가 설치되어 있다면 (예를 들어, 이 노트북을 코랩에서 실행하는 경우) 다음 세 개의 셀을 실행하여 서버를 시작하세요:

In [None]:
os.environ["MODEL_DIR"] = os.path.split(os.path.abspath(model_path))[0]

In [None]:
%%bash --bg
nohup tensorflow_model_server \
     --rest_api_port=8501 \
     --model_name=my_mnist_model \
     --model_base_path="${MODEL_DIR}" >server.log 2>&1

In [None]:
!tail server.log

2021-02-16 22:33:09.323538: I external/org_tensorflow/tensorflow/cc/saved_model/reader.cc:93] Reading SavedModel debug info (if present) from: /models/my_mnist_model/0001
2021-02-16 22:33:09.323642: I external/org_tensorflow/tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2021-02-16 22:33:09.360572: I external/org_tensorflow/tensorflow/cc/saved_model/loader.cc:206] Restoring SavedModel bundle.
2021-02-16 22:33:09.361764: I external/org_tensorflow/tensorflow/core/platform/profile_utils/cpu_utils.cc:112] CPU Frequency: 2200000000 Hz
2021-02-16 22:33:09.387713: I external/org_tensorflow/tensorflow/cc/saved_model/loader.cc:190] Running initialization op on SavedModel bundle at path: /models/my_mnist_model/0001
2021-02-16 22:33:09.

In [None]:
import json

input_data_json = json.dumps({
    "signature_name": "serving_default",
    "instances": X_new.tolist(),
})

In [None]:
repr(input_data_json)[:1500] + "..."

'\'{"signature_name": "serving_default", "instances": [[[[0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0]], [[0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0]], [[0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0]], [[0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0]], [[0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0

이제 텐서플로 서빙의 REST API를 사용해 예측을 만들어 보죠:

In [None]:
import requests

SERVER_URL = 'http://localhost:8501/v1/models/my_mnist_model:predict'
response = requests.post(SERVER_URL, data=input_data_json)
response.raise_for_status() # raise an exception in case of error
response = response.json()

In [None]:
response.keys()

dict_keys(['predictions'])

In [None]:
y_proba = np.array(response["predictions"])
y_proba.round(2)

array([[0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 1.  , 0.  , 0.  ],
       [0.  , 0.  , 0.99, 0.01, 0.  , 0.  , 0.  , 0.  , 0.  , 0.  ],
       [0.  , 0.97, 0.01, 0.  , 0.  , 0.  , 0.  , 0.01, 0.  , 0.  ]])

### gRPC API 사용하기

In [None]:
from tensorflow_serving.apis.predict_pb2 import PredictRequest

request = PredictRequest()
request.model_spec.name = model_name
request.model_spec.signature_name = "serving_default"
input_name = model.input_names[0]
request.inputs[input_name].CopyFrom(tf.make_tensor_proto(X_new))

In [None]:
import grpc
from tensorflow_serving.apis import prediction_service_pb2_grpc

channel = grpc.insecure_channel('localhost:8500')
predict_service = prediction_service_pb2_grpc.PredictionServiceStub(channel)
response = predict_service.Predict(request, timeout=10.0)

In [None]:
response

outputs {
  key: "dense_1"
  value {
    dtype: DT_FLOAT
    tensor_shape {
      dim {
        size: 3
      }
      dim {
        size: 10
      }
    }
    float_val: 0.00011425172124290839
    float_val: 1.513665068841874e-07
    float_val: 0.0009818424005061388
    float_val: 0.0027773496694862843
    float_val: 3.758880893656169e-06
    float_val: 7.6266449468676e-05
    float_val: 3.9139514740327286e-08
    float_val: 0.995561957359314
    float_val: 5.344580131350085e-05
    float_val: 0.00043088122038170695
    float_val: 0.0008194865076802671
    float_val: 3.5498320357874036e-05
    float_val: 0.9882420897483826
    float_val: 0.00705744931474328
    float_val: 1.2937064752804872e-07
    float_val: 0.00023402832448482513
    float_val: 0.0025743397418409586
    float_val: 9.668431610876382e-10
    float_val: 0.0010369382798671722
    float_val: 8.833576004008137e-08
    float_val: 4.441547571332194e-05
    float_val: 0.970328688621521
    float_val: 0.009044423699378967
    

응답을 텐서로 변환합니다:

In [None]:
output_name = model.output_names[0]
outputs_proto = response.outputs[output_name]
y_proba = tf.make_ndarray(outputs_proto)
y_proba.round(2)

array([[0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 1.  , 0.  , 0.  ],
       [0.  , 0.  , 0.99, 0.01, 0.  , 0.  , 0.  , 0.  , 0.  , 0.  ],
       [0.  , 0.97, 0.01, 0.  , 0.  , 0.  , 0.  , 0.01, 0.  , 0.  ]],
      dtype=float32)

클라이언트가 텐서플로 라이브러리를 사용하지 않는다면 넘파이 배열로 변환합니다:

In [None]:
output_name = model.output_names[0]
outputs_proto = response.outputs[output_name]
shape = [dim.size for dim in outputs_proto.tensor_shape.dim]
y_proba = np.array(outputs_proto.float_val).reshape(shape)
y_proba.round(2)

array([[0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 1.  , 0.  , 0.  ],
       [0.  , 0.  , 0.99, 0.01, 0.  , 0.  , 0.  , 0.  , 0.  , 0.  ],
       [0.  , 0.97, 0.01, 0.  , 0.  , 0.  , 0.  , 0.01, 0.  , 0.  ]])

## 새로운 버전의 모델 배포하기

In [None]:
np.random.seed(42)
tf.random.set_seed(42)

model = keras.models.Sequential([
    keras.layers.Flatten(input_shape=[28, 28, 1]),
    keras.layers.Dense(50, activation="relu"),
    keras.layers.Dense(50, activation="relu"),
    keras.layers.Dense(10, activation="softmax")
])
model.compile(loss="sparse_categorical_crossentropy",
              optimizer=keras.optimizers.SGD(learning_rate=1e-2),
              metrics=["accuracy"])
history = model.fit(X_train, y_train, epochs=10, validation_data=(X_valid, y_valid))

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


In [None]:
model_version = "0002"
model_name = "my_mnist_model"
model_path = os.path.join(model_name, model_version)
model_path

'my_mnist_model/0002'

In [None]:
tf.saved_model.save(model, model_path)

INFO:tensorflow:Assets written to: my_mnist_model/0002/assets


In [None]:
for root, dirs, files in os.walk(model_name):
    indent = '    ' * root.count(os.sep)
    print('{}{}/'.format(indent, os.path.basename(root)))
    for filename in files:
        print('{}{}'.format(indent + '    ', filename))

my_mnist_model/
    0001/
        saved_model.pb
        variables/
            variables.data-00000-of-00001
            variables.index
        assets/
    0002/
        saved_model.pb
        variables/
            variables.data-00000-of-00001
            variables.index
        assets/


**경고**: 새로운 모델이 텐서플로 서빙에 로드되기 전까지 잠시 기다려야 할 수 있습니다:

In [None]:
import requests

SERVER_URL = 'http://localhost:8501/v1/models/my_mnist_model:predict'
            
response = requests.post(SERVER_URL, data=input_data_json)
response.raise_for_status()
response = response.json()

In [None]:
response.keys()

dict_keys(['predictions'])

In [None]:
y_proba = np.array(response["predictions"])
y_proba.round(2)

array([[0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 1.  , 0.  , 0.  ],
       [0.  , 0.  , 0.99, 0.01, 0.  , 0.  , 0.  , 0.  , 0.  , 0.  ],
       [0.  , 0.99, 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  ]])

# 구글 클라우드 AI 플랫폼에 모델 배포하기

구글 클라우드 AI 플랫폼에 모델을 배포하는 책의 안내를 따르고, 서비스 계정의 개인키를 다운로드하여 프로젝트 디렉토리에 있는 `my_service_account_private_key.json` 파일에 저장하세요. 또한 `project_id`를 업데이트하세요:

In [None]:
project_id = "onyx-smoke-242003"

In [None]:
import googleapiclient.discovery

os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "my_service_account_private_key.json"
model_id = "my_mnist_model"
model_path = "projects/{}/models/{}".format(project_id, model_id)
model_path += "/versions/v0001/" # 특정 버전을 실행하고 싶다면
ml_resource = googleapiclient.discovery.build("ml", "v1").projects()

In [None]:
def predict(X):
    input_data_json = {"signature_name": "serving_default",
                       "instances": X.tolist()}
    request = ml_resource.predict(name=model_path, body=input_data_json)
    response = request.execute()
    if "error" in response:
        raise RuntimeError(response["error"])
    return np.array([pred[output_name] for pred in response["predictions"]])

In [None]:
Y_probas = predict(X_new)
np.round(Y_probas, 2)

array([[0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 1.  , 0.  , 0.  ],
       [0.  , 0.  , 0.99, 0.01, 0.  , 0.  , 0.  , 0.  , 0.  , 0.  ],
       [0.  , 0.99, 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  ]])

# GPU 사용하기

**노트**: `tf.test.is_gpu_available()`는 deprecated 되었습니다. 대신 `tf.config.list_physical_devices('GPU')`를 사용하세요.

In [None]:
#tf.test.is_gpu_available() # deprecated
tf.config.list_physical_devices('GPU')

[PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU'),
 PhysicalDevice(name='/physical_device:GPU:1', device_type='GPU')]

In [None]:
tf.test.gpu_device_name()

'/device:GPU:0'

In [None]:
tf.test.is_built_with_cuda()

True

In [None]:
from tensorflow.python.client.device_lib import list_local_devices

devices = list_local_devices()
devices

[name: "/device:CPU:0"
 device_type: "CPU"
 memory_limit: 268435456
 locality {
 }
 incarnation: 7325002731160755624,
 name: "/device:GPU:0"
 device_type: "GPU"
 memory_limit: 11139884032
 locality {
   bus_id: 1
   links {
     link {
       device_id: 1
       type: "StreamExecutor"
       strength: 1
     }
   }
 }
 incarnation: 7150956550266107441
 physical_device_desc: "device: 0, name: Tesla K80, pci bus id: 0000:00:04.0, compute capability: 3.7",
 name: "/device:GPU:1"
 device_type: "GPU"
 memory_limit: 11139884032
 locality {
   bus_id: 1
   links {
     link {
       type: "StreamExecutor"
       strength: 1
     }
   }
 }
 incarnation: 15909479382059415698
 physical_device_desc: "device: 1, name: Tesla K80, pci bus id: 0000:00:05.0, compute capability: 3.7"]

# 분산 훈련

In [None]:
keras.backend.clear_session()
tf.random.set_seed(42)
np.random.seed(42)

In [None]:
def create_model():
    return keras.models.Sequential([
        keras.layers.Conv2D(filters=64, kernel_size=7, activation="relu",
                            padding="same", input_shape=[28, 28, 1]),
        keras.layers.MaxPooling2D(pool_size=2),
        keras.layers.Conv2D(filters=128, kernel_size=3, activation="relu",
                            padding="same"), 
        keras.layers.Conv2D(filters=128, kernel_size=3, activation="relu",
                            padding="same"),
        keras.layers.MaxPooling2D(pool_size=2),
        keras.layers.Flatten(),
        keras.layers.Dense(units=64, activation='relu'),
        keras.layers.Dropout(0.5),
        keras.layers.Dense(units=10, activation='softmax'),
    ])

In [None]:
batch_size = 100
model = create_model()
model.compile(loss="sparse_categorical_crossentropy",
              optimizer=keras.optimizers.SGD(learning_rate=1e-2),
              metrics=["accuracy"])
model.fit(X_train, y_train, epochs=10,
          validation_data=(X_valid, y_valid), batch_size=batch_size)

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


<tensorflow.python.keras.callbacks.History at 0x7f014831c110>

In [None]:
keras.backend.clear_session()
tf.random.set_seed(42)
np.random.seed(42)

distribution = tf.distribute.MirroredStrategy()

# Change the default all-reduce algorithm:
#distribution = tf.distribute.MirroredStrategy(
#    cross_device_ops=tf.distribute.HierarchicalCopyAllReduce())

# Specify the list of GPUs to use:
#distribution = tf.distribute.MirroredStrategy(devices=["/gpu:0", "/gpu:1"])

# Use the central storage strategy instead:
#distribution = tf.distribute.experimental.CentralStorageStrategy()

#if IS_COLAB and "COLAB_TPU_ADDR" in os.environ:
#  tpu_address = "grpc://" + os.environ["COLAB_TPU_ADDR"]
#else:
#  tpu_address = ""
#resolver = tf.distribute.cluster_resolver.TPUClusterResolver(tpu_address)
#tf.config.experimental_connect_to_cluster(resolver)
#tf.tpu.experimental.initialize_tpu_system(resolver)
#distribution = tf.distribute.experimental.TPUStrategy(resolver)

with distribution.scope():
    model = create_model()
    model.compile(loss="sparse_categorical_crossentropy",
                  optimizer=keras.optimizers.SGD(learning_rate=1e-2),
                  metrics=["accuracy"])

INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:GPU:1')
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).


In [None]:
batch_size = 100 # must be divisible by the number of workers
model.fit(X_train, y_train, epochs=10,
          validation_data=(X_valid, y_valid), batch_size=batch_size)

Epoch 1/10
INFO:tensorflow:batch_all_reduce: 10 all-reduces with algorithm = nccl, num_packs = 1
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:batch_all_reduce: 10 all-reduces with algorithm = nccl, num_packs = 1
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/t

<tensorflow.python.keras.callbacks.History at 0x7f259bf1a6d0>

In [None]:
model.predict(X_new)

array([[2.53707055e-10, 7.94509292e-10, 1.02021443e-06, 3.37102080e-08,
        4.90816797e-11, 4.37713789e-11, 2.43314297e-14, 9.99996424e-01,
        1.50591750e-09, 2.50736753e-06],
       [1.11715025e-07, 8.56921833e-05, 9.99914169e-01, 6.31697228e-09,
        3.99949344e-11, 4.47976906e-10, 8.46022008e-09, 3.03771834e-08,
        2.91782563e-08, 1.95555502e-10],
       [4.68117065e-07, 9.99787748e-01, 1.01387537e-04, 2.87393277e-06,
        5.29725839e-05, 1.55926125e-06, 2.07211669e-05, 1.76809226e-05,
        9.37155255e-06, 5.19965897e-06]], dtype=float32)

사용자 정의 훈련 루프:

In [None]:
keras.backend.clear_session()
tf.random.set_seed(42)
np.random.seed(42)

K = keras.backend

distribution = tf.distribute.MirroredStrategy()

with distribution.scope():
    model = create_model()
    optimizer = keras.optimizers.SGD()

with distribution.scope():
    dataset = tf.data.Dataset.from_tensor_slices((X_train, y_train)).repeat().batch(batch_size)
    input_iterator = distribution.make_dataset_iterator(dataset)
    
@tf.function
def train_step():
    def step_fn(inputs):
        X, y = inputs
        with tf.GradientTape() as tape:
            Y_proba = model(X)
            loss = K.sum(keras.losses.sparse_categorical_crossentropy(y, Y_proba)) / batch_size

        grads = tape.gradient(loss, model.trainable_variables)
        optimizer.apply_gradients(zip(grads, model.trainable_variables))
        return loss

    per_replica_losses = distribution.experimental_run(step_fn, input_iterator)
    mean_loss = distribution.reduce(tf.distribute.ReduceOp.SUM,
                                    per_replica_losses, axis=None)
    return mean_loss

n_epochs = 10
with distribution.scope():
    input_iterator.initialize()
    for epoch in range(n_epochs):
        print("Epoch {}/{}".format(epoch + 1, n_epochs))
        for iteration in range(len(X_train) // batch_size):
            print("\rLoss: {:.3f}".format(train_step().numpy()), end="")
        print()

INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:GPU:1')
Instructions for updating:
Use the iterator's `initializer` property instead.
Epoch 1/10
INFO:tensorflow:batch_all_reduce: 10 all-reduces with algorithm = nccl, num_packs = 1
INFO:tensorflow:batch_all_reduce: 10 all-reduces with algorithm = nccl, num_packs = 1
Loss: 0.380
Epoch 2/10
Loss: 0.302
Epoch 3/10
Loss: 0.285
Epoch 4/10
Loss: 0.294
Epoch 5/10
Loss: 0.304
Epoch 6/10
Loss: 0.310
Epoch 7/10
Loss: 0.310
Epoch 8/10
Loss: 0.306
Epoch 9/10
Loss: 0.303
Epoch 10/10
Loss: 0.298


## 여러 서버에서 훈련하기

텐서플로 클러스터는 일반적으로 여러 서버에서 병렬로 실행되는 텐서플로 프로세스의 그룹입니다. 신경망을 훈련하거나 실행하는 작업을 완료하기 위해 프로세스가 서로 대화합니다. 클러스터에 있는 개별 TF 프로세스를 "태스크"라고 부릅니다(또는 "TF 서버"). 태스크는 IP 주소, 포트, 타입(또는 역할이나 잡(job)으로 부릅니다). 타입은 `"worker"`, `"chief"`, `"ps"`(파라미터 서버), `"evaluator"`가 있습니다:
* 각 **워커**는 일반적으로 하나 이상의 GPU를 가진 머신에서 계산을 수행합니다.
* **치프**도 계산을 수행합니다. 하지만 텐서 보드 로그를 기록하거나 체크포인트를 저장하는 등의 추가적인 작업을 처리합니다. 클러스터에는 하나의 치프가 있고 일반적으로 첫 번째 워커가 치프입니다(즉, 워커 #0).
* **파라미터 서버**(ps)는 변수 값만 가지고 있습니다. 일반적으로 CPU만 가진 머신입니다.
* **evaluator**는 평가를 담당합ㄴ디ㅏ. 일반적으로 클러스터 내에 하나의 evaluator가 있습니다.

동일한 타입을 공유하는 작업을 종종 "잡"(job)이라고 부릅니다. 예를 들어, "워커" 잡은 모든 워커의 집합입니다.

텐서플로 클러스터를 시작하려면 먼저 이를 정의해야 합니다. 모든 태스크(IP 주소, TCP 포트, 타입)를 지정한다는 것을 의미합니다. 예를 들어 다음 클러스터 명세는 세 개의 태스크로 구성된 클러스터를 정의합니다(두 대의 워커와 한 대의 파라미터 서버). 잡마다 하나의 키를 가진 딕셔너리이며 값은 태스크 주소의 리스트입니다:

In [None]:
cluster_spec = {
    "worker": [
        "machine-a.example.com:2222",  # /job:worker/task:0
        "machine-b.example.com:2222"   # /job:worker/task:1
    ],
    "ps": ["machine-c.example.com:2222"] # /job:ps/task:0
}

클러스터에 있는 각 태스크는 서버에 있는 다른 태스크와 통신할 수 있습니다. 따라서 해당 머신의 포트 사이에 모든 통신이 가능하도록 방화벽을 설정해야 합니다(모든 머신에서 동일한 포트를 사용하면 간단히 설정할 수 있습니다).

태스크가 시작될 때, 타입과 인덱스(태스크 인덱스를 태스크 아이디라고도 합니다)를 알려야 합니다. 한 번에 (클러스터 스펙과 현재 작업의 타입, 아이디를) 모두 정의하는 일반적인 방법은 프로그램을 시작하기 전에 `TF_CONFIG` 환경 변수를 설정하는 것입니다. (`"cluster"` 키 아래) 클러스터 스펙과 (`"task"` 키 아래) 시작할 태스크의 타입과 인덱스를 담은 JSON으로 인코딩된 딕셔너리입니다. 예를 들어 다음 `TF_CONFIG` 환경 변수는 위와 동일한 클러스터를 정의합니다. 두 대의 워커와 한 대의 파라미터 서버, 그리고 시작할 태스크는 워커 #1입니다:

In [None]:
import os
import json

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": cluster_spec,
    "task": {"type": "worker", "index": 1}
})
os.environ["TF_CONFIG"]

'{"cluster": {"worker": ["machine-a.example.com:2222", "machine-b.example.com:2222"], "ps": ["machine-c.example.com:2222"]}, "task": {"type": "worker", "index": 1}}'

일부 플랫폼(예를 들면, 구글 클라우드 ML 엔진)은 자동으로 이런 환경을 설정합니다.

텐서플로의 `TFConfigClusterResolver` 클래스는 환경 변수에서 클러스터 스펙을 읽습니다:

In [None]:
import tensorflow as tf

resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()
resolver.cluster_spec()

ClusterSpec({'ps': ['machine-c.example.com:2222'], 'worker': ['machine-a.example.com:2222', 'machine-b.example.com:2222']})

In [None]:
resolver.task_type

'worker'

In [None]:
resolver.task_id

1

이제 간단한 클러스터를 시작해 보죠. 두 개의 워커 태스크를 로컬 머신에서 실행합니다. `MultiWorkerMirroredStrategy`를 사용해 두 태스크에서 모델을 훈련하겠습니다.

첫 번째 단계로 훈련 코드를 작성합니다. 이 코드는 자체 프로세스를 가진 두 워커를 실행하는데 사용되기 때문에 별도의 파이썬 파일 `my_mnist_multiworker_task.py`로 이 코드를 저장합니다. 이 코드는 비교적 간단하지만 몇 가지 중요한 점이 있습니다:
* 텐서플로로 무엇을 하기전에 `MultiWorkerMirroredStrategy`를 생성합니다.
* 워커 중 한 대에서 텐서보드 로깅과 체크포인트 저장을 담당합니다. 앞서 언급한대로 이 워커를 *치프* 라고 부릅니다. 관례적으로 워커 #0입니다.

In [None]:
%%writefile my_mnist_multiworker_task.py

import os
import numpy as np
import tensorflow as tf
from tensorflow import keras
import time

# 프로그램 시작 부분에
distribution = tf.distribute.MultiWorkerMirroredStrategy()

resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()
print("Starting task {}{}".format(resolver.task_type, resolver.task_id))

# 워커 #0이 체크포인트 저장과 텐서보드 로깅을 수행합니다
if resolver.task_id == 0:
    root_logdir = os.path.join(os.curdir, "my_mnist_multiworker_logs")
    run_id = time.strftime("run_%Y_%m_%d-%H_%M_%S")
    run_dir = os.path.join(root_logdir, run_id)
    callbacks = [
        keras.callbacks.TensorBoard(run_dir),
        keras.callbacks.ModelCheckpoint("my_mnist_multiworker_model.h5",
                                        save_best_only=True),
    ]
else:
    callbacks = []

# MNIST 데이터셋을 로드하고 준비합니다
(X_train_full, y_train_full), (X_test, y_test) = keras.datasets.mnist.load_data()
X_train_full = X_train_full[..., np.newaxis] / 255.
X_valid, X_train = X_train_full[:5000], X_train_full[5000:]
y_valid, y_train = y_train_full[:5000], y_train_full[5000:]

with distribution.scope():
    model = keras.models.Sequential([
        keras.layers.Conv2D(filters=64, kernel_size=7, activation="relu",
                            padding="same", input_shape=[28, 28, 1]),
        keras.layers.MaxPooling2D(pool_size=2),
        keras.layers.Conv2D(filters=128, kernel_size=3, activation="relu",
                            padding="same"), 
        keras.layers.Conv2D(filters=128, kernel_size=3, activation="relu",
                            padding="same"),
        keras.layers.MaxPooling2D(pool_size=2),
        keras.layers.Flatten(),
        keras.layers.Dense(units=64, activation='relu'),
        keras.layers.Dropout(0.5),
        keras.layers.Dense(units=10, activation='softmax'),
    ])
    model.compile(loss="sparse_categorical_crossentropy",
                  optimizer=keras.optimizers.SGD(learning_rate=1e-2),
                  metrics=["accuracy"])

model.fit(X_train, y_train, validation_data=(X_valid, y_valid),
          epochs=10, callbacks=callbacks)

Overwriting my_mnist_multiworker_task.py


실제 애플리케이션에서는 일반적으로 머신마다 하나의 워커가 있지만 이 예에서는 동일한 머신에 두 워커를 실행합니다. 따라서 (GPU가 있다면) 두 워커가 모두 가용한 GPU 램을 사용하려고 하기 때문에 메모리 부족 에러가 날 수 있습니다. 이를 피하려면 `CUDA_VISIBLE_DEVICES` 환경 변수를 사용해 워커마다 다른 GPU를 할당할 수 있습니다. 또는 다음처럼 간단하게 GPU를 사용하지 않게 설정할 수 있습니다:

In [None]:
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"

이제 파이썬의 `subprocess` 모델을 사용해 두 워커를 각각의 개별 프로세스로 시작할 준비가 되었습니다. 프로세스를 시작하기 전에 `TF_CONFIG` 환경 변수에 태스크 인덱스를 적절히 설정해야 합니다:

In [None]:
import subprocess

cluster_spec = {"worker": ["127.0.0.1:9901", "127.0.0.1:9902"]}

for index, worker_address in enumerate(cluster_spec["worker"]):
    os.environ["TF_CONFIG"] = json.dumps({
        "cluster": cluster_spec,
        "task": {"type": "worker", "index": index}
    })
    subprocess.Popen("python my_mnist_multiworker_task.py", shell=True)

이제 됐습니다! 텐서플로 클러스터가 실행되었습니다. 하지만 별도의 프로세스로 실행되기 때문에 이 노트북에서는 볼 수 없습니다(하지만 이 노트북을 주피터에서 실행한다면 주피터 서버 로그에서 워커 로그를 볼 수 있습니다).

치프(워커 #0)가 텐서보드 로그를 작성하기 때문에 텐서보드로 훈련 과정을 볼 수 있습니다. 다음 셀을 실행하고 텐서보드 인터페이스의 설정(setting) 버튼을 누르고, "Reload data" 체크박스를 선택하면 텐서보드가 자동으로 30초마다 리프레시됩니다. 첫 번째 훈련 에포크가 끝나고 (몇 분 걸립니다) 텐서보드가 리프레시되면 SCALARS 탭이 나타날 것입니다. 이 탭을 클릭하고 모델의 훈련과 검증 정확도를 확인하세요.

In [None]:
%load_ext tensorboard
%tensorboard --logdir=./my_mnist_multiworker_logs --port=6006

The tensorboard extension is already loaded. To reload it, use:
  %reload_ext tensorboard


훈련이 끝나면 최상의 모델 체크포인트가 `my_mnist_multiworker_model.h5` 파일에 저장됩니다. `keras.models.load_model()`를 사용해 이를 로드하고 예측에 사용할 수 있습니다:

In [None]:
from tensorflow import keras

model = keras.models.load_model("my_mnist_multiworker_model.h5")
Y_pred = model.predict(X_new)
np.argmax(Y_pred, axis=-1)

array([7, 2, 1])

이 장의 노트북은 여기까지입니다! 이 내용이 도움이 되었으면 좋겠습니다. 😊

# 연습문제 해답

## 1. to 8.

부록 A 참조

## 9.
_연습문제: (원하는 어떤 모델이든) 모델을 훈련하고 TF 서빙이나 구글 클라우드 AI 플랫폼에 배포해보세요. REST API나 gRPC API를 사용해 쿼리하는 클라이언트 코드를 작성해보세요. 모델을 업데이트하고 새로운 버전을 배포해보세요. 클라이언트 코드가 새로운 버전으로 쿼리할 것입니다. 첫 번째 버전으로 롤백해보세요._

<a href="#텐서플로-서빙(TFS)으로-텐서플로-모델-배포하기">텐서플로 서빙(TFS)으로 텐서플로 모델 배포하기</a> 절에 있는 단계를 따라해 보세요.

# 10.
_연습문제: 하나의 머신에 여러 개의 GPU에서 `MirroredStrategy` 전략으로 모델을 훈련해보세요(GPU를 준비하지 못하면 코랩의 GPU 런타임을 사용하여 가상 GPU 2개를 만들 수 있습니다). `CentralStorageStrategy` 전략으로 모델을 다시 훈련하고 훈련 시간을 비교해보세요._

[분산 훈련](#분산-훈련) 절에 있는 단계를 따라해 보세요.

# 11.
_연습문제: 구글 클라우드 AI 플랫폼에서 블랙 박스 하이퍼파라미터 튜닝을 사용해 작은 모델을 훈련해보세요._

책의 837~838 페이지에 있는 내용을 따라해 보세요. 또는 [이 문서](https://cloud.google.com/ai-platform/training/docs/hyperparameter-tuning-overview)를 읽고 Lak Lakshmanan가 쓴 훌륭한 [블로그 포스트](https://towardsdatascience.com/how-to-do-bayesian-hyper-parameter-tuning-on-a-blackbox-model-882009552c6d)에 있는 예를 따라해 보세요.