**Chapter 19 – Huấn luyện và Triển khai Mô hình TensorFlow trên Quy mô lớn**

_Notebook này chứa toàn bộ mã nguồn mẫu và lời giải bài tập trong Chương 19._

<table align="left">
  <td>
    <a href="https://colab.research.google.com/github/ageron/handson-ml2/blob/master/19_training_and_deploying_at_scale.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>
  </td>
  <td>
    <a target="_blank" href="https://kaggle.com/kernels/welcome?src=https://github.com/ageron/handson-ml2/blob/master/19_training_and_deploying_at_scale.ipynb"><img src="https://kaggle.com/static/images/open-in-kaggle.svg" /></a>
  </td>
</table>

# Cài đặt
Đầu tiên hãy nhập một vài mô-đun thông dụng, đảm bảo rằng Matplotlib sẽ vẽ đồ thị ngay trong notebook, và chuẩn bị một hàm để lưu đồ thị. Ta cũng kiểm tra xem Python phiên bản từ 3.5 trở lên đã được cài đặt hay chưa (mặc dù Python 2.x vẫn có thể hoạt động, phiên bản này đã không còn sử dụng nên chúng tôi rất khuyến khích việc sử dụng Python 3), cũng như Scikit-Learn ≥ 0.20.


In [1]:
# Python ≥3.5 is required
import sys
assert sys.version_info >= (3, 5)

# Is this notebook running on Colab or Kaggle?
IS_COLAB = "google.colab" in sys.modules
IS_KAGGLE = "kaggle_secrets" in sys.modules

if IS_COLAB or IS_KAGGLE:
    !echo "deb http://storage.googleapis.com/tensorflow-serving-apt stable tensorflow-model-server tensorflow-model-server-universal" > /etc/apt/sources.list.d/tensorflow-serving.list
    !curl https://storage.googleapis.com/tensorflow-serving-apt/tensorflow-serving.release.pub.gpg | apt-key add -
    !apt update && apt-get install -y tensorflow-model-server
    %pip install -q -U tensorflow-serving-api

# Scikit-Learn ≥0.20 is required
import sklearn
assert sklearn.__version__ >= "0.20"

# TensorFlow ≥2.0 is required
import tensorflow as tf
from tensorflow import keras
assert tf.__version__ >= "2.0"

if not tf.config.list_physical_devices('GPU'):
    print("No GPU was detected. CNNs can be very slow without a GPU.")
    if IS_COLAB:
        print("Go to Runtime > Change runtime and select a GPU hardware accelerator.")
    if IS_KAGGLE:
        print("Go to Settings > Accelerator and select GPU.")

# Common imports
import numpy as np
import os

# to make this notebook's output stable across runs
np.random.seed(42)
tf.random.set_seed(42)

# To plot pretty figures
%matplotlib inline
import matplotlib as mpl
import matplotlib.pyplot as plt
mpl.rc('axes', labelsize=14)
mpl.rc('xtick', labelsize=12)
mpl.rc('ytick', labelsize=12)

# Where to save the figures
PROJECT_ROOT_DIR = "."
CHAPTER_ID = "deploy"
IMAGES_PATH = os.path.join(PROJECT_ROOT_DIR, "images", CHAPTER_ID)
os.makedirs(IMAGES_PATH, exist_ok=True)

def save_fig(fig_id, tight_layout=True, fig_extension="png", resolution=300):
    path = os.path.join(IMAGES_PATH, fig_id + "." + fig_extension)
    print("Saving figure", fig_id)
    if tight_layout:
        plt.tight_layout()
    plt.savefig(path, format=fig_extension, dpi=resolution)

# Triển khai mô hình TensorFlow lên TensorFlow Serving (TFS)
Ta sẽ sử dụng REST API hoặc gRPC API.

## Lưu/Nạp một `SavedModel`

In [2]:
(X_train_full, y_train_full), (X_test, y_test) = keras.datasets.mnist.load_data()
X_train_full = X_train_full[..., np.newaxis].astype(np.float32) / 255.
X_test = X_test[..., np.newaxis].astype(np.float32) / 255.
X_valid, X_train = X_train_full[:5000], X_train_full[5000:]
y_valid, y_train = y_train_full[:5000], y_train_full[5000:]
X_new = X_test[:3]

In [3]:
np.random.seed(42)
tf.random.set_seed(42)

model = keras.models.Sequential([
    keras.layers.Flatten(input_shape=[28, 28, 1]),
    keras.layers.Dense(100, activation="relu"),
    keras.layers.Dense(10, activation="softmax")
])
model.compile(loss="sparse_categorical_crossentropy",
              optimizer=keras.optimizers.SGD(learning_rate=1e-2),
              metrics=["accuracy"])
model.fit(X_train, y_train, epochs=10, validation_data=(X_valid, y_valid))

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


<tensorflow.python.keras.callbacks.History at 0x7fe3b8718590>

In [4]:
np.round(model.predict(X_new), 2)

array([[0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 1.  , 0.  , 0.  ],
       [0.  , 0.  , 0.99, 0.01, 0.  , 0.  , 0.  , 0.  , 0.  , 0.  ],
       [0.  , 0.97, 0.01, 0.  , 0.  , 0.  , 0.  , 0.01, 0.  , 0.  ]],
      dtype=float32)

In [5]:
model_version = "0001"
model_name = "my_mnist_model"
model_path = os.path.join(model_name, model_version)
model_path

'my_mnist_model/0001'

In [6]:
import shutil

shutil.rmtree(model_name)

In [7]:
tf.saved_model.save(model, model_path)

INFO:tensorflow:Assets written to: my_mnist_model/0001/assets


In [8]:
for root, dirs, files in os.walk(model_name):
    indent = '    ' * root.count(os.sep)
    print('{}{}/'.format(indent, os.path.basename(root)))
    for filename in files:
        print('{}{}'.format(indent + '    ', filename))

my_mnist_model/
    0001/
        saved_model.pb
        variables/
            variables.data-00000-of-00001
            variables.index
        assets/


In [9]:
!saved_model_cli show --dir {model_path}

The given SavedModel contains the following tag-sets:
'serve'


In [10]:
!saved_model_cli show --dir {model_path} --tag_set serve

The given SavedModel MetaGraphDef contains SignatureDefs with the following keys:
SignatureDef key: "__saved_model_init_op"
SignatureDef key: "serving_default"


In [11]:
!saved_model_cli show --dir {model_path} --tag_set serve \
                      --signature_def serving_default

The given SavedModel SignatureDef contains the following input(s):
  inputs['flatten_input'] tensor_info:
      dtype: DT_FLOAT
      shape: (-1, 28, 28, 1)
      name: serving_default_flatten_input:0
The given SavedModel SignatureDef contains the following output(s):
  outputs['dense_1'] tensor_info:
      dtype: DT_FLOAT
      shape: (-1, 10)
      name: StatefulPartitionedCall:0
Method name is: tensorflow/serving/predict


In [12]:
!saved_model_cli show --dir {model_path} --all


MetaGraphDef with tag-set: 'serve' contains the following SignatureDefs:

signature_def['__saved_model_init_op']:
  The given SavedModel SignatureDef contains the following input(s):
  The given SavedModel SignatureDef contains the following output(s):
    outputs['__saved_model_init_op'] tensor_info:
        dtype: DT_INVALID
        shape: unknown_rank
        name: NoOp
  Method name is: 

signature_def['serving_default']:
  The given SavedModel SignatureDef contains the following input(s):
    inputs['flatten_input'] tensor_info:
        dtype: DT_FLOAT
        shape: (-1, 28, 28, 1)
        name: serving_default_flatten_input:0
  The given SavedModel SignatureDef contains the following output(s):
    outputs['dense_1'] tensor_info:
        dtype: DT_FLOAT
        shape: (-1, 10)
        name: StatefulPartitionedCall:0
  Method name is: tensorflow/serving/predict

Defined Functions:
  Function Name: '__call__'
    Option #1
      Callable with:
        Argument #1
          flatte

Hãy viết các mẫu mới vào một file `npy` để có thể dễ dàng đưa chúng vào mô hình:

In [13]:
np.save("my_mnist_tests.npy", X_new)

In [14]:
input_name = model.input_names[0]
input_name

'flatten_input'

Và giờ hãy dùng `saved_model_cli` để đưa ra dự đoán cho các mẫu mà ta vừa lưu:

In [15]:
!saved_model_cli run --dir {model_path} --tag_set serve \
                     --signature_def serving_default    \
                     --inputs {input_name}=my_mnist_tests.npy

2021-02-18 22:15:30.294109: I tensorflow/compiler/jit/xla_cpu_device.cc:41] Not creating XLA devices, tf_xla_enable_xla_devices not set
2021-02-18 22:15:30.294306: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
Instructions for updating:
This function will only be available through the v1 compatibility library as tf.compat.v1.saved_model.loader.load or tf.compat.v1.saved_model.load. There will be a new function for importing SavedModels in Tensorflow 2.0.
INFO:tensorflow:Restoring parameters from my_mnist_model/0001/variables/variables
2021-02-18 22:15:30.323498: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:196] None of the MLIR optimization passes are enabled (registered 0 passes)
Result for output key dense_

In [16]:
np.round([[1.1347984e-04, 1.5187356e-07, 9.7032893e-04, 2.7640699e-03, 3.7826971e-06,
           7.6876910e-05, 3.9140293e-08, 9.9559116e-01, 5.3502394e-05, 4.2665208e-04],
          [8.2443521e-04, 3.5493889e-05, 9.8826385e-01, 7.0466995e-03, 1.2957400e-07,
           2.3389691e-04, 2.5639210e-03, 9.5886099e-10, 1.0314899e-03, 8.7952529e-08],
          [4.4693781e-05, 9.7028232e-01, 9.0526715e-03, 2.2641101e-03, 4.8766597e-04,
           2.8800720e-03, 2.2714981e-03, 8.3753867e-03, 4.0439744e-03, 2.9759688e-04]], 2)

array([[0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 1.  , 0.  , 0.  ],
       [0.  , 0.  , 0.99, 0.01, 0.  , 0.  , 0.  , 0.  , 0.  , 0.  ],
       [0.  , 0.97, 0.01, 0.  , 0.  , 0.  , 0.  , 0.01, 0.  , 0.  ]])

## TensorFlow Serving

Hãy cài [Docker](https://docs.docker.com/install/) nếu bạn chưa có nó trên máy. Sau đó chạy:

```bash
docker pull tensorflow/serving

export ML_PATH=$HOME/ml # or wherever this project is
docker run -it --rm -p 8500:8500 -p 8501:8501 \
   -v "$ML_PATH/my_mnist_model:/models/my_mnist_model" \
   -e MODEL_NAME=my_mnist_model \
   tensorflow/serving
```
Sau khi xong việc, hãy nhấn Ctrl-C để tắt server.

Nhưng nếu `tensorflow_model_server` đã được cài sẵn (ví dụ, nếu bạn đang chạy notebook này trên Colab), thì 3 cell sau sẽ khởi động server:

In [17]:
os.environ["MODEL_DIR"] = os.path.split(os.path.abspath(model_path))[0]

In [18]:
%%bash --bg
nohup tensorflow_model_server \
     --rest_api_port=8501 \
     --model_name=my_mnist_model \
     --model_base_path="${MODEL_DIR}" >server.log 2>&1

In [19]:
!tail server.log

2021-02-16 22:33:09.323538: I external/org_tensorflow/tensorflow/cc/saved_model/reader.cc:93] Reading SavedModel debug info (if present) from: /models/my_mnist_model/0001
2021-02-16 22:33:09.323642: I external/org_tensorflow/tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2021-02-16 22:33:09.360572: I external/org_tensorflow/tensorflow/cc/saved_model/loader.cc:206] Restoring SavedModel bundle.
2021-02-16 22:33:09.361764: I external/org_tensorflow/tensorflow/core/platform/profile_utils/cpu_utils.cc:112] CPU Frequency: 2200000000 Hz
2021-02-16 22:33:09.387713: I external/org_tensorflow/tensorflow/cc/saved_model/loader.cc:190] Running initialization op on SavedModel bundle at path: /models/my_mnist_model/0001
2021-02-16 22:33:09.

In [20]:
import json

input_data_json = json.dumps({
    "signature_name": "serving_default",
    "instances": X_new.tolist(),
})

In [21]:
repr(input_data_json)[:1500] + "..."

'\'{"signature_name": "serving_default", "instances": [[[[0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0]], [[0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0]], [[0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0]], [[0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0]], [[0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0

Giờ hãy sử dụng REST API của TensorFlow Serving để đưa ra dự đoán:

In [22]:
import requests

SERVER_URL = 'http://localhost:8501/v1/models/my_mnist_model:predict'
response = requests.post(SERVER_URL, data=input_data_json)
response.raise_for_status() # raise an exception in case of error
response = response.json()

In [23]:
response.keys()

dict_keys(['predictions'])

In [24]:
y_proba = np.array(response["predictions"])
y_proba.round(2)

array([[0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 1.  , 0.  , 0.  ],
       [0.  , 0.  , 0.99, 0.01, 0.  , 0.  , 0.  , 0.  , 0.  , 0.  ],
       [0.  , 0.97, 0.01, 0.  , 0.  , 0.  , 0.  , 0.01, 0.  , 0.  ]])

### Sử dụng gRPC API

In [25]:
from tensorflow_serving.apis.predict_pb2 import PredictRequest

request = PredictRequest()
request.model_spec.name = model_name
request.model_spec.signature_name = "serving_default"
input_name = model.input_names[0]
request.inputs[input_name].CopyFrom(tf.make_tensor_proto(X_new))

In [26]:
import grpc
from tensorflow_serving.apis import prediction_service_pb2_grpc

channel = grpc.insecure_channel('localhost:8500')
predict_service = prediction_service_pb2_grpc.PredictionServiceStub(channel)
response = predict_service.Predict(request, timeout=10.0)

In [27]:
response

outputs {
  key: "dense_1"
  value {
    dtype: DT_FLOAT
    tensor_shape {
      dim {
        size: 3
      }
      dim {
        size: 10
      }
    }
    float_val: 0.00011425172124290839
    float_val: 1.513665068841874e-07
    float_val: 0.0009818424005061388
    float_val: 0.0027773496694862843
    float_val: 3.758880893656169e-06
    float_val: 7.6266449468676e-05
    float_val: 3.9139514740327286e-08
    float_val: 0.995561957359314
    float_val: 5.344580131350085e-05
    float_val: 0.00043088122038170695
    float_val: 0.0008194865076802671
    float_val: 3.5498320357874036e-05
    float_val: 0.9882420897483826
    float_val: 0.00705744931474328
    float_val: 1.2937064752804872e-07
    float_val: 0.00023402832448482513
    float_val: 0.0025743397418409586
    float_val: 9.668431610876382e-10
    float_val: 0.0010369382798671722
    float_val: 8.833576004008137e-08
    float_val: 4.441547571332194e-05
    float_val: 0.970328688621521
    float_val: 0.009044423699378967
    

Chuyển đổi response thành một tensor:

In [28]:
output_name = model.output_names[0]
outputs_proto = response.outputs[output_name]
y_proba = tf.make_ndarray(outputs_proto)
y_proba.round(2)

array([[0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 1.  , 0.  , 0.  ],
       [0.  , 0.  , 0.99, 0.01, 0.  , 0.  , 0.  , 0.  , 0.  , 0.  ],
       [0.  , 0.97, 0.01, 0.  , 0.  , 0.  , 0.  , 0.01, 0.  , 0.  ]],
      dtype=float32)

Hoặc thành một mảng NumPy nếu máy của bạn không có thư viện TensorFlow:

In [29]:
output_name = model.output_names[0]
outputs_proto = response.outputs[output_name]
shape = [dim.size for dim in outputs_proto.tensor_shape.dim]
y_proba = np.array(outputs_proto.float_val).reshape(shape)
y_proba.round(2)

array([[0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 1.  , 0.  , 0.  ],
       [0.  , 0.  , 0.99, 0.01, 0.  , 0.  , 0.  , 0.  , 0.  , 0.  ],
       [0.  , 0.97, 0.01, 0.  , 0.  , 0.  , 0.  , 0.01, 0.  , 0.  ]])

## Triển khai phiên bản mô hình mới

In [30]:
np.random.seed(42)
tf.random.set_seed(42)

model = keras.models.Sequential([
    keras.layers.Flatten(input_shape=[28, 28, 1]),
    keras.layers.Dense(50, activation="relu"),
    keras.layers.Dense(50, activation="relu"),
    keras.layers.Dense(10, activation="softmax")
])
model.compile(loss="sparse_categorical_crossentropy",
              optimizer=keras.optimizers.SGD(learning_rate=1e-2),
              metrics=["accuracy"])
history = model.fit(X_train, y_train, epochs=10, validation_data=(X_valid, y_valid))

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


In [31]:
model_version = "0002"
model_name = "my_mnist_model"
model_path = os.path.join(model_name, model_version)
model_path

'my_mnist_model/0002'

In [32]:
tf.saved_model.save(model, model_path)

INFO:tensorflow:Assets written to: my_mnist_model/0002/assets


In [33]:
for root, dirs, files in os.walk(model_name):
    indent = '    ' * root.count(os.sep)
    print('{}{}/'.format(indent, os.path.basename(root)))
    for filename in files:
        print('{}{}'.format(indent + '    ', filename))

my_mnist_model/
    0001/
        saved_model.pb
        variables/
            variables.data-00000-of-00001
            variables.index
        assets/
    0002/
        saved_model.pb
        variables/
            variables.data-00000-of-00001
            variables.index
        assets/


**Lưu ý**: Bạn có thể sẽ phải đợi một phút để TensorFlow serving nạp xong mô hình mới.

In [34]:
import requests

SERVER_URL = 'http://localhost:8501/v1/models/my_mnist_model:predict'
            
response = requests.post(SERVER_URL, data=input_data_json)
response.raise_for_status()
response = response.json()

In [35]:
response.keys()

dict_keys(['predictions'])

In [36]:
y_proba = np.array(response["predictions"])
y_proba.round(2)

array([[0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 1.  , 0.  , 0.  ],
       [0.  , 0.  , 0.99, 0.01, 0.  , 0.  , 0.  , 0.  , 0.  , 0.  ],
       [0.  , 0.99, 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  ]])

# Triển khai mô hình trên Google Cloud AI Platform

Hãy làm theo hướng dẫn trong sách để triển khai mô hình trên Google Cloud AI Platform, tải về khóa bí mật của tài khoản dịch vụ và lưu nó vào tệp `my_service_account_private_key.json` trong thư mục dự án. Ngoài ra, hãy cập nhật `project_id`:

In [37]:
project_id = "onyx-smoke-242003"

In [38]:
import googleapiclient.discovery

os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "my_service_account_private_key.json"
model_id = "my_mnist_model"
model_path = "projects/{}/models/{}".format(project_id, model_id)
model_path += "/versions/v0001/" # if you want to run a specific version
ml_resource = googleapiclient.discovery.build("ml", "v1").projects()

In [39]:
def predict(X):
    input_data_json = {"signature_name": "serving_default",
                       "instances": X.tolist()}
    request = ml_resource.predict(name=model_path, body=input_data_json)
    response = request.execute()
    if "error" in response:
        raise RuntimeError(response["error"])
    return np.array([pred[output_name] for pred in response["predictions"]])

In [40]:
Y_probas = predict(X_new)
np.round(Y_probas, 2)

array([[0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 1.  , 0.  , 0.  ],
       [0.  , 0.  , 0.99, 0.01, 0.  , 0.  , 0.  , 0.  , 0.  , 0.  ],
       [0.  , 0.99, 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  ]])

# Sử dụng GPU

**Ghi chú**: `tf.test.is_gpu_available()` đã bị deprecated. Thay vào đó, hãy sử dụng `tf.config.list_physical_devices('GPU')`.

In [41]:
#tf.test.is_gpu_available() # deprecated
tf.config.list_physical_devices('GPU')

[PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU'),
 PhysicalDevice(name='/physical_device:GPU:1', device_type='GPU')]

In [42]:
tf.test.gpu_device_name()

'/device:GPU:0'

In [43]:
tf.test.is_built_with_cuda()

True

In [44]:
from tensorflow.python.client.device_lib import list_local_devices

devices = list_local_devices()
devices

[name: "/device:CPU:0"
 device_type: "CPU"
 memory_limit: 268435456
 locality {
 }
 incarnation: 7325002731160755624,
 name: "/device:GPU:0"
 device_type: "GPU"
 memory_limit: 11139884032
 locality {
   bus_id: 1
   links {
     link {
       device_id: 1
       type: "StreamExecutor"
       strength: 1
     }
   }
 }
 incarnation: 7150956550266107441
 physical_device_desc: "device: 0, name: Tesla K80, pci bus id: 0000:00:04.0, compute capability: 3.7",
 name: "/device:GPU:1"
 device_type: "GPU"
 memory_limit: 11139884032
 locality {
   bus_id: 1
   links {
     link {
       type: "StreamExecutor"
       strength: 1
     }
   }
 }
 incarnation: 15909479382059415698
 physical_device_desc: "device: 1, name: Tesla K80, pci bus id: 0000:00:05.0, compute capability: 3.7"]

# Huấn luyện Phân tán

In [45]:
keras.backend.clear_session()
tf.random.set_seed(42)
np.random.seed(42)

In [46]:
def create_model():
    return keras.models.Sequential([
        keras.layers.Conv2D(filters=64, kernel_size=7, activation="relu",
                            padding="same", input_shape=[28, 28, 1]),
        keras.layers.MaxPooling2D(pool_size=2),
        keras.layers.Conv2D(filters=128, kernel_size=3, activation="relu",
                            padding="same"), 
        keras.layers.Conv2D(filters=128, kernel_size=3, activation="relu",
                            padding="same"),
        keras.layers.MaxPooling2D(pool_size=2),
        keras.layers.Flatten(),
        keras.layers.Dense(units=64, activation='relu'),
        keras.layers.Dropout(0.5),
        keras.layers.Dense(units=10, activation='softmax'),
    ])

In [47]:
batch_size = 100
model = create_model()
model.compile(loss="sparse_categorical_crossentropy",
              optimizer=keras.optimizers.SGD(learning_rate=1e-2),
              metrics=["accuracy"])
model.fit(X_train, y_train, epochs=10,
          validation_data=(X_valid, y_valid), batch_size=batch_size)

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


<tensorflow.python.keras.callbacks.History at 0x7f014831c110>

In [48]:
keras.backend.clear_session()
tf.random.set_seed(42)
np.random.seed(42)

distribution = tf.distribute.MirroredStrategy()

# Change the default all-reduce algorithm:
#distribution = tf.distribute.MirroredStrategy(
#    cross_device_ops=tf.distribute.HierarchicalCopyAllReduce())

# Specify the list of GPUs to use:
#distribution = tf.distribute.MirroredStrategy(devices=["/gpu:0", "/gpu:1"])

# Use the central storage strategy instead:
#distribution = tf.distribute.experimental.CentralStorageStrategy()

#if IS_COLAB and "COLAB_TPU_ADDR" in os.environ:
#  tpu_address = "grpc://" + os.environ["COLAB_TPU_ADDR"]
#else:
#  tpu_address = ""
#resolver = tf.distribute.cluster_resolver.TPUClusterResolver(tpu_address)
#tf.config.experimental_connect_to_cluster(resolver)
#tf.tpu.experimental.initialize_tpu_system(resolver)
#distribution = tf.distribute.experimental.TPUStrategy(resolver)

with distribution.scope():
    model = create_model()
    model.compile(loss="sparse_categorical_crossentropy",
                  optimizer=keras.optimizers.SGD(learning_rate=1e-2),
                  metrics=["accuracy"])

INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:GPU:1')
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).


In [49]:
batch_size = 100 # must be divisible by the number of workers
model.fit(X_train, y_train, epochs=10,
          validation_data=(X_valid, y_valid), batch_size=batch_size)

Epoch 1/10
INFO:tensorflow:batch_all_reduce: 10 all-reduces with algorithm = nccl, num_packs = 1
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:batch_all_reduce: 10 all-reduces with algorithm = nccl, num_packs = 1
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/t

<tensorflow.python.keras.callbacks.History at 0x7f259bf1a6d0>

In [50]:
model.predict(X_new)

array([[2.53707055e-10, 7.94509292e-10, 1.02021443e-06, 3.37102080e-08,
        4.90816797e-11, 4.37713789e-11, 2.43314297e-14, 9.99996424e-01,
        1.50591750e-09, 2.50736753e-06],
       [1.11715025e-07, 8.56921833e-05, 9.99914169e-01, 6.31697228e-09,
        3.99949344e-11, 4.47976906e-10, 8.46022008e-09, 3.03771834e-08,
        2.91782563e-08, 1.95555502e-10],
       [4.68117065e-07, 9.99787748e-01, 1.01387537e-04, 2.87393277e-06,
        5.29725839e-05, 1.55926125e-06, 2.07211669e-05, 1.76809226e-05,
        9.37155255e-06, 5.19965897e-06]], dtype=float32)

Vòng lặp huấn luyện tùy chỉnh:

In [51]:
keras.backend.clear_session()
tf.random.set_seed(42)
np.random.seed(42)

K = keras.backend

distribution = tf.distribute.MirroredStrategy()

with distribution.scope():
    model = create_model()
    optimizer = keras.optimizers.SGD()

with distribution.scope():
    dataset = tf.data.Dataset.from_tensor_slices((X_train, y_train)).repeat().batch(batch_size)
    input_iterator = distribution.make_dataset_iterator(dataset)
    
@tf.function
def train_step():
    def step_fn(inputs):
        X, y = inputs
        with tf.GradientTape() as tape:
            Y_proba = model(X)
            loss = K.sum(keras.losses.sparse_categorical_crossentropy(y, Y_proba)) / batch_size

        grads = tape.gradient(loss, model.trainable_variables)
        optimizer.apply_gradients(zip(grads, model.trainable_variables))
        return loss

    per_replica_losses = distribution.experimental_run(step_fn, input_iterator)
    mean_loss = distribution.reduce(tf.distribute.ReduceOp.SUM,
                                    per_replica_losses, axis=None)
    return mean_loss

n_epochs = 10
with distribution.scope():
    input_iterator.initialize()
    for epoch in range(n_epochs):
        print("Epoch {}/{}".format(epoch + 1, n_epochs))
        for iteration in range(len(X_train) // batch_size):
            print("\rLoss: {:.3f}".format(train_step().numpy()), end="")
        print()

INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:GPU:1')
Instructions for updating:
Use the iterator's `initializer` property instead.
Epoch 1/10
INFO:tensorflow:batch_all_reduce: 10 all-reduces with algorithm = nccl, num_packs = 1
INFO:tensorflow:batch_all_reduce: 10 all-reduces with algorithm = nccl, num_packs = 1
Loss: 0.380
Epoch 2/10
Loss: 0.302
Epoch 3/10
Loss: 0.285
Epoch 4/10
Loss: 0.294
Epoch 5/10
Loss: 0.304
Epoch 6/10
Loss: 0.310
Epoch 7/10
Loss: 0.310
Epoch 8/10
Loss: 0.306
Epoch 9/10
Loss: 0.303
Epoch 10/10
Loss: 0.298


## Huấn luyện trên nhiều máy chủ

Một cụm TensorFlow là một nhóm các tiến trình TensorFlow đang chạy song song, thường là trên các máy khác nhau, và đang giao tiếp với nhau để hoàn thành một công việc nào đó, ví dụ như huấn luyện hoặc chạy một mạng nơ-ron. Mỗi tiến trình TF trong cụm được gọi là một "tác vụ" (hay "máy chủ TF"). Nó có một địa chỉ IP, một cổng và một kiểu (còn được gọi là vai trò hay công việc). Kiểu của tác vụ có thể là `"worker"`, `"chief"`, `"ps"` (parameter server -- máy chủ tham số) hay `"evaluator"`:
* Mỗi **worker** sẽ thực hiện tính toán, thường trên máy có một hoặc nhiều GPU.
* **chief** cũng sẽ thực hiện tính toán, nhưng nó còn làm thêm những việc như viết các bản ghi Tensorboard hoặc lưu các checkpoint. Chỉ có một chief duy nhất trong một cluster, và thường đó là worker đầu tiên (tức worker #0).
* Một **máy chủ tham số** (**parameter server** -- ps) sẽ chỉ theo dõi giá trị của các biến, và thường nằm trên một máy chỉ có CPU.
* Hiển nhiên thì **evaluator** sẽ đảm nhiệm việc đánh giá. Thường chỉ có một evaluator duy nhất trong một cluster.

Một tập hợp các tác vụ có cùng kiểu thường được gọi là "job". Ví dụ, job "worker" là tập hợp tất cả các worker.

Để khởi động một cụm TensorFlow, đầu tiên ta cần định nghĩa nó. Ta sẽ phải xác định rõ mọi tác vụ (địa chỉ IP, cổng TCP và kiểu). Ví dụ, thông số cụm như sau sẽ định nghĩa một cụm có 3 tác vụ (2 worker và 1 máy chủ tham số). Đây là một từ điển với một khóa cho mỗi job, và các giá trị là các danh sách chứa địa chỉ của tác vụ:

In [52]:
cluster_spec = {
    "worker": [
        "machine-a.example.com:2222",  # /job:worker/task:0
        "machine-b.example.com:2222"   # /job:worker/task:1
    ],
    "ps": ["machine-c.example.com:2222"] # /job:ps/task:0
}

Mỗi tác vụ trong cụm có thể giao tiếp với mọi tác vụ khác trong máy chủ, vì vậy hãy đảm bảo rằng bạn đã cài đặt tường lửa để cho phép các máy giao tiếp với nhau trên các cổng đó (thường sẽ đơn giản hơn nếu bạn dùng cùng một cổng trên mọi máy).

Khi một tác vụ bắt đầu, nó cần phải biết bản thân nó là tác vụ nào: nó cần phải biết kiểu và chỉ số của nó (chỉ số tác vụ còn được gọi là id tác vụ). Một cách phổ biến để chỉ định mọi thứ cùng một lúc (cả thông số cụm cùng với kiểu và id của tác vụ hiện tại) là khai báo biến môi trường `TF_CONFIG` trước khi chạy chương trình. Biến này phải nhận một từ điển được biểu diễn dưới dạng JSON chứa thông số cụm (trong khóa `"cluster"`), cùng với kiểu và chỉ số của tác vụ đang bắt đầu (trong khóa `"task"`). Ví dụ, biến môi trường `TF_CONFIG` sau định nghĩa cụm giống như trên, với 2 worker và 1 máy chủ tham số, và chỉ định worker #1 là tác vụ bắt đầu:

In [53]:
import os
import json

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": cluster_spec,
    "task": {"type": "worker", "index": 1}
})
os.environ["TF_CONFIG"]

'{"cluster": {"worker": ["machine-a.example.com:2222", "machine-b.example.com:2222"], "ps": ["machine-c.example.com:2222"]}, "task": {"type": "worker", "index": 1}}'

Một số nền tảng (ví dụ như Google Cloud ML Engine) tự động khai báo biến môi trường này giúp bạn.

Lớp `TFConfigClusterResolver` của TensorFlow đọc thông số cụm từ biến môi trường này:

In [54]:
import tensorflow as tf

resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()
resolver.cluster_spec()

ClusterSpec({'ps': ['machine-c.example.com:2222'], 'worker': ['machine-a.example.com:2222', 'machine-b.example.com:2222']})

In [55]:
resolver.task_type

'worker'

In [56]:
resolver.task_id

1

Bây giờ hãy chạy một cụm đơn giản với chỉ hai tác vụ worker, và cả hai đều chạy trên máy cục bộ. Ta sẽ dùng `MultiWorkerMirroredStrategy` để huấn luyện một mô hình thông qua hai tác vụ này.

Bước đầu tiên là viết mã huấn luyện. Vì đoạn mã này sẽ được dùng để chạy cả hai worker, và mỗi worker sẽ chạy trong tiến trình của riêng nó, ta sẽ viết đoạn mã này trong một file Python riêng biệt `my_mnist_multiworker_task.py`. Đoạn mã này khá dễ hiểu, nhưng có một vài điều quan trọng cần lưu ý:
* Ta khởi tạo `MultiWorkerMirroredStrategy` trước khi làm bất kỳ điều gì khác với TensorFlow.
* Chỉ một worker đảm nhiệm việc viết vào bản ghi TensorBoard và lưu checkpoint. Như đã nói ở trên, worker này được gọi là *chief*, và theo quy ước thì đó thường là worker #0.

In [57]:
%%writefile my_mnist_multiworker_task.py

import os
import numpy as np
import tensorflow as tf
from tensorflow import keras
import time

# At the beginning of the program
distribution = tf.distribute.MultiWorkerMirroredStrategy()

resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()
print("Starting task {}{}".format(resolver.task_type, resolver.task_id))

# Only worker #0 will write checkpoints and log to TensorBoard
if resolver.task_id == 0:
    root_logdir = os.path.join(os.curdir, "my_mnist_multiworker_logs")
    run_id = time.strftime("run_%Y_%m_%d-%H_%M_%S")
    run_dir = os.path.join(root_logdir, run_id)
    callbacks = [
        keras.callbacks.TensorBoard(run_dir),
        keras.callbacks.ModelCheckpoint("my_mnist_multiworker_model.h5",
                                        save_best_only=True),
    ]
else:
    callbacks = []

# Load and prepare the MNIST dataset
(X_train_full, y_train_full), (X_test, y_test) = keras.datasets.mnist.load_data()
X_train_full = X_train_full[..., np.newaxis] / 255.
X_valid, X_train = X_train_full[:5000], X_train_full[5000:]
y_valid, y_train = y_train_full[:5000], y_train_full[5000:]

with distribution.scope():
    model = keras.models.Sequential([
        keras.layers.Conv2D(filters=64, kernel_size=7, activation="relu",
                            padding="same", input_shape=[28, 28, 1]),
        keras.layers.MaxPooling2D(pool_size=2),
        keras.layers.Conv2D(filters=128, kernel_size=3, activation="relu",
                            padding="same"), 
        keras.layers.Conv2D(filters=128, kernel_size=3, activation="relu",
                            padding="same"),
        keras.layers.MaxPooling2D(pool_size=2),
        keras.layers.Flatten(),
        keras.layers.Dense(units=64, activation='relu'),
        keras.layers.Dropout(0.5),
        keras.layers.Dense(units=10, activation='softmax'),
    ])
    model.compile(loss="sparse_categorical_crossentropy",
                  optimizer=keras.optimizers.SGD(learning_rate=1e-2),
                  metrics=["accuracy"])

model.fit(X_train, y_train, validation_data=(X_valid, y_valid),
          epochs=10, callbacks=callbacks)

Overwriting my_mnist_multiworker_task.py


Trong ứng dụng thực tế, thường sẽ có một worker duy nhất cho mỗi máy, nhưng trong ví dụ này ta đang chạy cả hai worker trên cùng một máy, nên chúng sẽ đều cố sử dụng hết tất cả lượng GPU RAM khả dụng (nếu máy có GPU), và khả năng cao ta sẽ gặp lỗi Out-Of-Memory (OOM). Để tránh lỗi này, ta có thể dùng biến môi trường `CUDA_VISIBLE_DEVICES` để phân một GPU riêng cho mỗi worker. Ngoài ra, ta cũng có thể đơn thuần vô hiệu hóa việc dùng GPU như sau:

In [58]:
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"

Giờ ta đã sẵn sàng để khởi động cả hai worker, mỗi worker trong một tiến trình riêng, bằng cách sử dụng mô-đun `subprocess` của Python. Trước khi bắt đầu các tiến trình, ta cần khai báo đúng biến môi trường `TF_CONFIG`, và chỉ thay đổi chỉ số tác vụ:

In [59]:
import subprocess

cluster_spec = {"worker": ["127.0.0.1:9901", "127.0.0.1:9902"]}

for index, worker_address in enumerate(cluster_spec["worker"]):
    os.environ["TF_CONFIG"] = json.dumps({
        "cluster": cluster_spec,
        "task": {"type": "worker", "index": index}
    })
    subprocess.Popen("python my_mnist_multiworker_task.py", shell=True)

Vậy là xong! Cụm TensorFlow giờ đang chạy, nhưng ta không thể thấy nó trong notebook này vì nó đang chạy trong các tiến trình riêng biệt (nhưng nếu bạn đang chạy notebook này trong Jupyter, bạn có thể thấy bản ghi của worker nằm trong bản ghi máy chủ của Jupyter).

Vì chief (worker #0) đang viết vào TensorBoard, ta sẽ dùng TensorBoard để theo dõi quá trình huấn luyện. Hãy chạy cell dưới, rồi nhấn vào nút cài đặt (biểu tượng bánh răng) trong giao diện TensorBoard và tích vào ô "Reload data" để khiến TensorBoard tự động tải lại sau mỗi 30 giây. Sau khi hoàn thành epoch huấn luyện đầu tiên (có thể mất một vài phút), và TensorBoard đã tải lại, thẻ SCALARS sẽ xuất hiện. Hãy nhấn vào thẻ này để theo dõi độ chính xác huấn luyện và kiểm định của mô hình.

In [60]:
%load_ext tensorboard
%tensorboard --logdir=./my_mnist_multiworker_logs --port=6006

The tensorboard extension is already loaded. To reload it, use:
  %reload_ext tensorboard


Chỉ vậy thôi! Sau khi huấn luyện xong, checkpoint tốt nhất của mô hình nằm ở tệp `my_mnist_multiworker_model.h5`. Ta có thể nạp nó bằng `keras.models.load_model()` và đưa ra dự đoán như thường lệ:

In [61]:
from tensorflow import keras

model = keras.models.load_model("my_mnist_multiworker_model.h5")
Y_pred = model.predict(X_new)
np.argmax(Y_pred, axis=-1)

array([7, 2, 1])

Đó là tất cả cho ngày hôm nay! Mong rằng bạn thấy notebook này hữu ích. 😊

# Lời giải Bài tập

## 1. đến 8.

Xem Phụ lục A.

## 9.
_Bài tập: Huấn luyện một mô hình (bất kỳ mô hình nào bạn thích) và triển khai nó lên TF Serving hoặc Google Cloud AI Platform. Hãy viết mã cho máy khách để truy cập mô hình bằng REST API hoặc gRPC API. Cập nhật mô hình và triển khai phiên bản mới. Mã mãy khách của bạn giờ sẽ truy cập phiên bản mới. Khôi phục mô hình về phiên bản đầu tiên._

Hãy làm theo các bước trong phần [Triển khai mô hình TensorFlow lên TensorFlow Serving (TFS)](#Triển-khai-mô-hình-TensorFlow-lên-TensorFlow-Serving-(TFS)) phía trên.

# 10.
_Bài tập: Huấn luyện một mô hình bất kỳ bằng nhiều GPU trên cùng một máy bằng cách sử dụng `MirroredStrategy` (nếu không có GPU, bạn có thể dùng Colaboratory với GPU Runtime và tạo hai GPU ảo). Huấn luyện lại mô hình bằng `CentralStorageStrategy` và so sánh thời gian huấn luyện._

Hãy làm theo các bước trong phần [Huấn luyện Phân tán](#Huấn-luyện-Phân-tán) phía trên.

# 11.
_Bài tập: Huấn luyện một mô hình nhỏ trên Google Cloud AI Platform, sử dụng phương pháp tinh chỉnh siêu tham số black box._

Hãy làm theo hướng dẫn ở trang 716-717 (TODO: sửa theo bản render) trong sách. Bạn cũng có thể đọc [trang tài liệu này](https://cloud.google.com/ai-platform/training/docs/hyperparameter-tuning-overview) và thử làm theo ví dụ trong [bài viết này](https://towardsdatascience.com/how-to-do-bayesian-hyper-parameter-tuning-on-a-blackbox-model-882009552c6d) của Lak Lakshmanan.