<a href="https://colab.research.google.com/github/kinaya18/Hands-on-Machine-Learning-with-Scikit-Learn-Keras-TensorFlow/blob/main/Neural%20Networks%20and%20Deep%20Learning/Chapter_19.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Chapter 19 – Training and Deploying TensorFlow Models at Scale**

In [None]:
import sys
assert sys.version_info >= (3, 5)

IS_COLAB = "google.colab" in sys.modules
IS_KAGGLE = "kaggle_secrets" in sys.modules

if IS_COLAB or IS_KAGGLE:
    !echo "deb http://storage.googleapis.com/tensorflow-serving-apt stable tensorflow-model-server tensorflow-model-server-universal" > /etc/apt/sources.list.d/tensorflow-serving.list
    !curl https://storage.googleapis.com/tensorflow-serving-apt/tensorflow-serving.release.pub.gpg | apt-key add -
    !apt update && apt-get install -y tensorflow-model-server
    %pip install -q -U tensorflow-serving-api

import sklearn
assert sklearn.__version__ >= "0.20"

import tensorflow as tf
from tensorflow import keras
assert tf.__version__ >= "2.0"

if not tf.config.list_physical_devices('GPU'):
    print("No GPU was detected. CNNs can be very slow without a GPU.")
    if IS_COLAB:
        print("Go to Runtime > Change runtime and select a GPU hardware accelerator.")
    if IS_KAGGLE:
        print("Go to Settings > Accelerator and select GPU.")

import numpy as np
import os

np.random.seed(42)
tf.random.set_seed(42)

%matplotlib inline
import matplotlib as mpl
import matplotlib.pyplot as plt
mpl.rc('axes', labelsize=14)
mpl.rc('xtick', labelsize=12)
mpl.rc('ytick', labelsize=12)

PROJECT_ROOT_DIR = "."
CHAPTER_ID = "deploy"
IMAGES_PATH = os.path.join(PROJECT_ROOT_DIR, "images", CHAPTER_ID)
os.makedirs(IMAGES_PATH, exist_ok=True)

def save_fig(fig_id, tight_layout=True, fig_extension="png", resolution=300):
    path = os.path.join(IMAGES_PATH, fig_id + "." + fig_extension)
    print("Saving figure", fig_id)
    if tight_layout:
        plt.tight_layout()
    plt.savefig(path, format=fig_extension, dpi=resolution)

# Deploying TensorFlow models to TensorFlow Serving (TFS)


## Save/Load a `SavedModel`

In [None]:
(X_train_full, y_train_full), (X_test, y_test) = keras.datasets.mnist.load_data()
X_train_full = X_train_full[..., np.newaxis].astype(np.float32) / 255.
X_test = X_test[..., np.newaxis].astype(np.float32) / 255.
X_valid, X_train = X_train_full[:5000], X_train_full[5000:]
y_valid, y_train = y_train_full[:5000], y_train_full[5000:]
X_new = X_test[:3]

In [None]:
np.random.seed(42)
tf.random.set_seed(42)

model = keras.models.Sequential([
    keras.layers.Flatten(input_shape=[28, 28, 1]),
    keras.layers.Dense(100, activation="relu"),
    keras.layers.Dense(10, activation="softmax")
])
model.compile(loss="sparse_categorical_crossentropy",
              optimizer=keras.optimizers.SGD(learning_rate=1e-2),
              metrics=["accuracy"])
model.fit(X_train, y_train, epochs=10, validation_data=(X_valid, y_valid))

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


<tensorflow.python.keras.callbacks.History at 0x7fe3b8718590>

In [None]:
np.round(model.predict(X_new), 2)

array([[0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 1.  , 0.  , 0.  ],
       [0.  , 0.  , 0.99, 0.01, 0.  , 0.  , 0.  , 0.  , 0.  , 0.  ],
       [0.  , 0.97, 0.01, 0.  , 0.  , 0.  , 0.  , 0.01, 0.  , 0.  ]],
      dtype=float32)

In [None]:
model_version = "0001"
model_name = "my_mnist_model"
model_path = os.path.join(model_name, model_version)
model_path

'my_mnist_model/0001'

In [None]:
import shutil

shutil.rmtree(model_name)

In [None]:
tf.saved_model.save(model, model_path)

INFO:tensorflow:Assets written to: my_mnist_model/0001/assets


In [None]:
for root, dirs, files in os.walk(model_name):
    indent = '    ' * root.count(os.sep)
    print('{}{}/'.format(indent, os.path.basename(root)))
    for filename in files:
        print('{}{}'.format(indent + '    ', filename))

my_mnist_model/
    0001/
        saved_model.pb
        variables/
            variables.data-00000-of-00001
            variables.index
        assets/


In [None]:
!saved_model_cli show --dir {model_path}

The given SavedModel contains the following tag-sets:
'serve'


In [None]:
!saved_model_cli show --dir {model_path} --tag_set serve

The given SavedModel MetaGraphDef contains SignatureDefs with the following keys:
SignatureDef key: "__saved_model_init_op"
SignatureDef key: "serving_default"


In [None]:
!saved_model_cli show --dir {model_path} --tag_set serve \
                      --signature_def serving_default

The given SavedModel SignatureDef contains the following input(s):
  inputs['flatten_input'] tensor_info:
      dtype: DT_FLOAT
      shape: (-1, 28, 28, 1)
      name: serving_default_flatten_input:0
The given SavedModel SignatureDef contains the following output(s):
  outputs['dense_1'] tensor_info:
      dtype: DT_FLOAT
      shape: (-1, 10)
      name: StatefulPartitionedCall:0
Method name is: tensorflow/serving/predict


In [None]:
!saved_model_cli show --dir {model_path} --all


MetaGraphDef with tag-set: 'serve' contains the following SignatureDefs:

signature_def['__saved_model_init_op']:
  The given SavedModel SignatureDef contains the following input(s):
  The given SavedModel SignatureDef contains the following output(s):
    outputs['__saved_model_init_op'] tensor_info:
        dtype: DT_INVALID
        shape: unknown_rank
        name: NoOp
  Method name is: 

signature_def['serving_default']:
  The given SavedModel SignatureDef contains the following input(s):
    inputs['flatten_input'] tensor_info:
        dtype: DT_FLOAT
        shape: (-1, 28, 28, 1)
        name: serving_default_flatten_input:0
  The given SavedModel SignatureDef contains the following output(s):
    outputs['dense_1'] tensor_info:
        dtype: DT_FLOAT
        shape: (-1, 10)
        name: StatefulPartitionedCall:0
  Method name is: tensorflow/serving/predict

Defined Functions:
  Function Name: '__call__'
    Option #1
      Callable with:
        Argument #1
          flatte

In [None]:
np.save("my_mnist_tests.npy", X_new)

In [None]:
input_name = model.input_names[0]
input_name

'flatten_input'

In [None]:
!saved_model_cli run --dir {model_path} --tag_set serve \
                     --signature_def serving_default    \
                     --inputs {input_name}=my_mnist_tests.npy

2021-02-18 22:15:30.294109: I tensorflow/compiler/jit/xla_cpu_device.cc:41] Not creating XLA devices, tf_xla_enable_xla_devices not set
2021-02-18 22:15:30.294306: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
Instructions for updating:
This function will only be available through the v1 compatibility library as tf.compat.v1.saved_model.loader.load or tf.compat.v1.saved_model.load. There will be a new function for importing SavedModels in Tensorflow 2.0.
INFO:tensorflow:Restoring parameters from my_mnist_model/0001/variables/variables
2021-02-18 22:15:30.323498: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:196] None of the MLIR optimization passes are enabled (registered 0 passes)
Result for output key dense_

In [None]:
np.round([[1.1347984e-04, 1.5187356e-07, 9.7032893e-04, 2.7640699e-03, 3.7826971e-06,
           7.6876910e-05, 3.9140293e-08, 9.9559116e-01, 5.3502394e-05, 4.2665208e-04],
          [8.2443521e-04, 3.5493889e-05, 9.8826385e-01, 7.0466995e-03, 1.2957400e-07,
           2.3389691e-04, 2.5639210e-03, 9.5886099e-10, 1.0314899e-03, 8.7952529e-08],
          [4.4693781e-05, 9.7028232e-01, 9.0526715e-03, 2.2641101e-03, 4.8766597e-04,
           2.8800720e-03, 2.2714981e-03, 8.3753867e-03, 4.0439744e-03, 2.9759688e-04]], 2)

array([[0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 1.  , 0.  , 0.  ],
       [0.  , 0.  , 0.99, 0.01, 0.  , 0.  , 0.  , 0.  , 0.  , 0.  ],
       [0.  , 0.97, 0.01, 0.  , 0.  , 0.  , 0.  , 0.01, 0.  , 0.  ]])

## TensorFlow Serving

In [None]:
os.environ["MODEL_DIR"] = os.path.split(os.path.abspath(model_path))[0]

In [None]:
%%bash --bg
nohup tensorflow_model_server \
     --rest_api_port=8501 \
     --model_name=my_mnist_model \
     --model_base_path="${MODEL_DIR}" >server.log 2>&1

In [None]:
!tail server.log

2021-02-16 22:33:09.323538: I external/org_tensorflow/tensorflow/cc/saved_model/reader.cc:93] Reading SavedModel debug info (if present) from: /models/my_mnist_model/0001
2021-02-16 22:33:09.323642: I external/org_tensorflow/tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2021-02-16 22:33:09.360572: I external/org_tensorflow/tensorflow/cc/saved_model/loader.cc:206] Restoring SavedModel bundle.
2021-02-16 22:33:09.361764: I external/org_tensorflow/tensorflow/core/platform/profile_utils/cpu_utils.cc:112] CPU Frequency: 2200000000 Hz
2021-02-16 22:33:09.387713: I external/org_tensorflow/tensorflow/cc/saved_model/loader.cc:190] Running initialization op on SavedModel bundle at path: /models/my_mnist_model/0001
2021-02-16 22:33:09.

In [None]:
import json

input_data_json = json.dumps({
    "signature_name": "serving_default",
    "instances": X_new.tolist(),
})

In [None]:
repr(input_data_json)[:1500] + "..."

'\'{"signature_name": "serving_default", "instances": [[[[0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0]], [[0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0]], [[0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0]], [[0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0]], [[0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0

In [None]:
import requests

SERVER_URL = 'http://localhost:8501/v1/models/my_mnist_model:predict'
response = requests.post(SERVER_URL, data=input_data_json)
response.raise_for_status() # raise an exception in case of error
response = response.json()

In [None]:
response.keys()

dict_keys(['predictions'])

In [None]:
y_proba = np.array(response["predictions"])
y_proba.round(2)

array([[0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 1.  , 0.  , 0.  ],
       [0.  , 0.  , 0.99, 0.01, 0.  , 0.  , 0.  , 0.  , 0.  , 0.  ],
       [0.  , 0.97, 0.01, 0.  , 0.  , 0.  , 0.  , 0.01, 0.  , 0.  ]])

### Using the gRPC API

In [None]:
from tensorflow_serving.apis.predict_pb2 import PredictRequest

request = PredictRequest()
request.model_spec.name = model_name
request.model_spec.signature_name = "serving_default"
input_name = model.input_names[0]
request.inputs[input_name].CopyFrom(tf.make_tensor_proto(X_new))

In [None]:
import grpc
from tensorflow_serving.apis import prediction_service_pb2_grpc

channel = grpc.insecure_channel('localhost:8500')
predict_service = prediction_service_pb2_grpc.PredictionServiceStub(channel)
response = predict_service.Predict(request, timeout=10.0)

In [None]:
response

outputs {
  key: "dense_1"
  value {
    dtype: DT_FLOAT
    tensor_shape {
      dim {
        size: 3
      }
      dim {
        size: 10
      }
    }
    float_val: 0.00011425172124290839
    float_val: 1.513665068841874e-07
    float_val: 0.0009818424005061388
    float_val: 0.0027773496694862843
    float_val: 3.758880893656169e-06
    float_val: 7.6266449468676e-05
    float_val: 3.9139514740327286e-08
    float_val: 0.995561957359314
    float_val: 5.344580131350085e-05
    float_val: 0.00043088122038170695
    float_val: 0.0008194865076802671
    float_val: 3.5498320357874036e-05
    float_val: 0.9882420897483826
    float_val: 0.00705744931474328
    float_val: 1.2937064752804872e-07
    float_val: 0.00023402832448482513
    float_val: 0.0025743397418409586
    float_val: 9.668431610876382e-10
    float_val: 0.0010369382798671722
    float_val: 8.833576004008137e-08
    float_val: 4.441547571332194e-05
    float_val: 0.970328688621521
    float_val: 0.009044423699378967
    

Convert the response to a tensor:

In [None]:
output_name = model.output_names[0]
outputs_proto = response.outputs[output_name]
y_proba = tf.make_ndarray(outputs_proto)
y_proba.round(2)

array([[0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 1.  , 0.  , 0.  ],
       [0.  , 0.  , 0.99, 0.01, 0.  , 0.  , 0.  , 0.  , 0.  , 0.  ],
       [0.  , 0.97, 0.01, 0.  , 0.  , 0.  , 0.  , 0.01, 0.  , 0.  ]],
      dtype=float32)

In [None]:
output_name = model.output_names[0]
outputs_proto = response.outputs[output_name]
shape = [dim.size for dim in outputs_proto.tensor_shape.dim]
y_proba = np.array(outputs_proto.float_val).reshape(shape)
y_proba.round(2)

array([[0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 1.  , 0.  , 0.  ],
       [0.  , 0.  , 0.99, 0.01, 0.  , 0.  , 0.  , 0.  , 0.  , 0.  ],
       [0.  , 0.97, 0.01, 0.  , 0.  , 0.  , 0.  , 0.01, 0.  , 0.  ]])

## Deploying a new model version

In [None]:
np.random.seed(42)
tf.random.set_seed(42)

model = keras.models.Sequential([
    keras.layers.Flatten(input_shape=[28, 28, 1]),
    keras.layers.Dense(50, activation="relu"),
    keras.layers.Dense(50, activation="relu"),
    keras.layers.Dense(10, activation="softmax")
])
model.compile(loss="sparse_categorical_crossentropy",
              optimizer=keras.optimizers.SGD(learning_rate=1e-2),
              metrics=["accuracy"])
history = model.fit(X_train, y_train, epochs=10, validation_data=(X_valid, y_valid))

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


In [None]:
model_version = "0002"
model_name = "my_mnist_model"
model_path = os.path.join(model_name, model_version)
model_path

'my_mnist_model/0002'

In [None]:
tf.saved_model.save(model, model_path)

INFO:tensorflow:Assets written to: my_mnist_model/0002/assets


In [None]:
for root, dirs, files in os.walk(model_name):
    indent = '    ' * root.count(os.sep)
    print('{}{}/'.format(indent, os.path.basename(root)))
    for filename in files:
        print('{}{}'.format(indent + '    ', filename))

my_mnist_model/
    0001/
        saved_model.pb
        variables/
            variables.data-00000-of-00001
            variables.index
        assets/
    0002/
        saved_model.pb
        variables/
            variables.data-00000-of-00001
            variables.index
        assets/


In [None]:
import requests

SERVER_URL = 'http://localhost:8501/v1/models/my_mnist_model:predict'

response = requests.post(SERVER_URL, data=input_data_json)
response.raise_for_status()
response = response.json()

In [None]:
response.keys()

dict_keys(['predictions'])

In [None]:
y_proba = np.array(response["predictions"])
y_proba.round(2)

array([[0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 1.  , 0.  , 0.  ],
       [0.  , 0.  , 0.99, 0.01, 0.  , 0.  , 0.  , 0.  , 0.  , 0.  ],
       [0.  , 0.99, 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  ]])

# Deploy the model to Google Cloud AI Platform

In [None]:
project_id = "onyx-smoke-242003"

In [None]:
import googleapiclient.discovery

os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "my_service_account_private_key.json"
model_id = "my_mnist_model"
model_path = "projects/{}/models/{}".format(project_id, model_id)
model_path += "/versions/v0001/"
ml_resource = googleapiclient.discovery.build("ml", "v1").projects()

In [None]:
def predict(X):
    input_data_json = {"signature_name": "serving_default",
                       "instances": X.tolist()}
    request = ml_resource.predict(name=model_path, body=input_data_json)
    response = request.execute()
    if "error" in response:
        raise RuntimeError(response["error"])
    return np.array([pred[output_name] for pred in response["predictions"]])

In [None]:
Y_probas = predict(X_new)
np.round(Y_probas, 2)

array([[0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 1.  , 0.  , 0.  ],
       [0.  , 0.  , 0.99, 0.01, 0.  , 0.  , 0.  , 0.  , 0.  , 0.  ],
       [0.  , 0.99, 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  ]])

# Using GPUs

In [None]:
#tf.test.is_gpu_available() # deprecated
tf.config.list_physical_devices('GPU')

[PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU'),
 PhysicalDevice(name='/physical_device:GPU:1', device_type='GPU')]

In [None]:
tf.test.gpu_device_name()

'/device:GPU:0'

In [None]:
tf.test.is_built_with_cuda()

True

In [None]:
from tensorflow.python.client.device_lib import list_local_devices

devices = list_local_devices()
devices

[name: "/device:CPU:0"
 device_type: "CPU"
 memory_limit: 268435456
 locality {
 }
 incarnation: 7325002731160755624,
 name: "/device:GPU:0"
 device_type: "GPU"
 memory_limit: 11139884032
 locality {
   bus_id: 1
   links {
     link {
       device_id: 1
       type: "StreamExecutor"
       strength: 1
     }
   }
 }
 incarnation: 7150956550266107441
 physical_device_desc: "device: 0, name: Tesla K80, pci bus id: 0000:00:04.0, compute capability: 3.7",
 name: "/device:GPU:1"
 device_type: "GPU"
 memory_limit: 11139884032
 locality {
   bus_id: 1
   links {
     link {
       type: "StreamExecutor"
       strength: 1
     }
   }
 }
 incarnation: 15909479382059415698
 physical_device_desc: "device: 1, name: Tesla K80, pci bus id: 0000:00:05.0, compute capability: 3.7"]

# Distributed Training

In [None]:
keras.backend.clear_session()
tf.random.set_seed(42)
np.random.seed(42)

In [None]:
def create_model():
    return keras.models.Sequential([
        keras.layers.Conv2D(filters=64, kernel_size=7, activation="relu",
                            padding="same", input_shape=[28, 28, 1]),
        keras.layers.MaxPooling2D(pool_size=2),
        keras.layers.Conv2D(filters=128, kernel_size=3, activation="relu",
                            padding="same"),
        keras.layers.Conv2D(filters=128, kernel_size=3, activation="relu",
                            padding="same"),
        keras.layers.MaxPooling2D(pool_size=2),
        keras.layers.Flatten(),
        keras.layers.Dense(units=64, activation='relu'),
        keras.layers.Dropout(0.5),
        keras.layers.Dense(units=10, activation='softmax'),
    ])

In [None]:
batch_size = 100
model = create_model()
model.compile(loss="sparse_categorical_crossentropy",
              optimizer=keras.optimizers.SGD(learning_rate=1e-2),
              metrics=["accuracy"])
model.fit(X_train, y_train, epochs=10,
          validation_data=(X_valid, y_valid), batch_size=batch_size)

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


<tensorflow.python.keras.callbacks.History at 0x7f014831c110>

In [None]:
keras.backend.clear_session()
tf.random.set_seed(42)
np.random.seed(42)

distribution = tf.distribute.MirroredStrategy()

with distribution.scope():
    model = create_model()
    model.compile(loss="sparse_categorical_crossentropy",
                  optimizer=keras.optimizers.SGD(learning_rate=1e-2),
                  metrics=["accuracy"])

INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:GPU:1')
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).


In [None]:
batch_size = 100
model.fit(X_train, y_train, epochs=10,
          validation_data=(X_valid, y_valid), batch_size=batch_size)

Epoch 1/10
INFO:tensorflow:batch_all_reduce: 10 all-reduces with algorithm = nccl, num_packs = 1
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:batch_all_reduce: 10 all-reduces with algorithm = nccl, num_packs = 1
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/t

<tensorflow.python.keras.callbacks.History at 0x7f259bf1a6d0>

In [None]:
model.predict(X_new)

array([[2.53707055e-10, 7.94509292e-10, 1.02021443e-06, 3.37102080e-08,
        4.90816797e-11, 4.37713789e-11, 2.43314297e-14, 9.99996424e-01,
        1.50591750e-09, 2.50736753e-06],
       [1.11715025e-07, 8.56921833e-05, 9.99914169e-01, 6.31697228e-09,
        3.99949344e-11, 4.47976906e-10, 8.46022008e-09, 3.03771834e-08,
        2.91782563e-08, 1.95555502e-10],
       [4.68117065e-07, 9.99787748e-01, 1.01387537e-04, 2.87393277e-06,
        5.29725839e-05, 1.55926125e-06, 2.07211669e-05, 1.76809226e-05,
        9.37155255e-06, 5.19965897e-06]], dtype=float32)

Custom training loop:

In [None]:
keras.backend.clear_session()
tf.random.set_seed(42)
np.random.seed(42)

K = keras.backend

distribution = tf.distribute.MirroredStrategy()

with distribution.scope():
    model = create_model()
    optimizer = keras.optimizers.SGD()

with distribution.scope():
    dataset = tf.data.Dataset.from_tensor_slices((X_train, y_train)).repeat().batch(batch_size)
    input_iterator = distribution.make_dataset_iterator(dataset)

@tf.function
def train_step():
    def step_fn(inputs):
        X, y = inputs
        with tf.GradientTape() as tape:
            Y_proba = model(X)
            loss = K.sum(keras.losses.sparse_categorical_crossentropy(y, Y_proba)) / batch_size

        grads = tape.gradient(loss, model.trainable_variables)
        optimizer.apply_gradients(zip(grads, model.trainable_variables))
        return loss

    per_replica_losses = distribution.experimental_run(step_fn, input_iterator)
    mean_loss = distribution.reduce(tf.distribute.ReduceOp.SUM,
                                    per_replica_losses, axis=None)
    return mean_loss

n_epochs = 10
with distribution.scope():
    input_iterator.initialize()
    for epoch in range(n_epochs):
        print("Epoch {}/{}".format(epoch + 1, n_epochs))
        for iteration in range(len(X_train) // batch_size):
            print("\rLoss: {:.3f}".format(train_step().numpy()), end="")
        print()

INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:GPU:1')
Instructions for updating:
Use the iterator's `initializer` property instead.
Epoch 1/10
INFO:tensorflow:batch_all_reduce: 10 all-reduces with algorithm = nccl, num_packs = 1
INFO:tensorflow:batch_all_reduce: 10 all-reduces with algorithm = nccl, num_packs = 1
Loss: 0.380
Epoch 2/10
Loss: 0.302
Epoch 3/10
Loss: 0.285
Epoch 4/10
Loss: 0.294
Epoch 5/10
Loss: 0.304
Epoch 6/10
Loss: 0.310
Epoch 7/10
Loss: 0.310
Epoch 8/10
Loss: 0.306
Epoch 9/10
Loss: 0.303
Epoch 10/10
Loss: 0.298


## Training across multiple servers

Kluster TensorFlow adalah sekelompok proses TensorFlow yang berjalan secara paralel, biasanya pada mesin yang berbeda, dan saling berkomunikasi untuk menyelesaikan beberapa pekerjaan, misalnya melatih atau mengeksekusi jaringan neural. Setiap proses TF dalam kluster disebut "tugas" (atau "server TF"). Proses ini memiliki alamat IP, port, dan tipe (juga disebut peran atau pekerjaannya). Tipe tersebut dapat berupa `"worker"`, `"chief"`, `"ps"` (server parameter) atau `"evaluator"`:
* Setiap **worker** melakukan komputasi, biasanya pada mesin dengan satu atau beberapa GPU.
* **Chief** juga melakukan komputasi, tetapi juga menangani pekerjaan tambahan seperti menulis log TensorBoard atau menyimpan titik pemeriksaan. Ada satu chief dalam kluster, biasanya pekerja pertama (yaitu, pekerja #0).
* **Server parameter** (ps) hanya melacak nilai variabel, biasanya pada mesin khusus CPU. * **Evaluator** jelas menangani evaluasi. Biasanya ada satu evaluator dalam satu klaster.

Kumpulan tugas yang memiliki tipe yang sama sering disebut "pekerjaan". Misalnya, pekerjaan "pekerja" adalah kumpulan semua pekerja.

Untuk memulai klaster TensorFlow, Anda harus mendefinisikannya terlebih dahulu. Ini berarti menentukan semua tugas (alamat IP, port TCP, dan tipe). Misalnya, spesifikasi klaster berikut mendefinisikan klaster dengan 3 tugas (2 pekerja dan 1 server parameter). Ini adalah kamus dengan satu kunci per pekerjaan, dan nilainya adalah daftar alamat tugas:

In [None]:
cluster_spec = {
    "worker": [
        "machine-a.example.com:2222",  # /job:worker/task:0
        "machine-b.example.com:2222"   # /job:worker/task:1
    ],
    "ps": ["machine-c.example.com:2222"] # /job:ps/task:0
}

Setiap tugas dalam kluster dapat berkomunikasi dengan setiap tugas lain di server, jadi pastikan untuk mengonfigurasi firewall Anda guna mengotorisasi semua komunikasi antara mesin-mesin ini pada port-port ini (biasanya lebih mudah jika Anda menggunakan port yang sama pada setiap mesin).

Saat tugas dimulai, tugas tersebut perlu diberi tahu tugasnya: jenis dan indeksnya (indeks tugas juga disebut id tugas). Cara umum untuk menentukan semuanya sekaligus (baik spesifikasi kluster maupun jenis dan id tugas saat ini) adalah dengan menyetel variabel lingkungan `TF_CONFIG` sebelum memulai program. Variabel tersebut harus berupa kamus berkode JSON yang berisi spesifikasi kluster (di bawah kunci `"cluster"`), dan jenis serta indeks tugas yang akan dimulai (di bawah kunci `"task"`). Misalnya, variabel lingkungan `TF_CONFIG` berikut mendefinisikan kluster yang sama seperti di atas, dengan 2 pekerja dan 1 server parameter, dan menetapkan bahwa tugas yang akan dimulai adalah pekerja #1:

In [None]:
import os
import json

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": cluster_spec,
    "task": {"type": "worker", "index": 1}
})
os.environ["TF_CONFIG"]

'{"cluster": {"worker": ["machine-a.example.com:2222", "machine-b.example.com:2222"], "ps": ["machine-c.example.com:2222"]}, "task": {"type": "worker", "index": 1}}'

Kelas `TFConfigClusterResolver` TensorFlow membaca konfigurasi kluster dari variabel lingkungan ini:

In [None]:
import tensorflow as tf

resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()
resolver.cluster_spec()

ClusterSpec({'ps': ['machine-c.example.com:2222'], 'worker': ['machine-a.example.com:2222', 'machine-b.example.com:2222']})

In [None]:
resolver.task_type

'worker'

In [None]:
resolver.task_id

1

In [None]:
%%writefile my_mnist_multiworker_task.py

import os
import numpy as np
import tensorflow as tf
from tensorflow import keras
import time

# At the beginning of the program
distribution = tf.distribute.MultiWorkerMirroredStrategy()

resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()
print("Starting task {}{}".format(resolver.task_type, resolver.task_id))

# Only worker #0 will write checkpoints and log to TensorBoard
if resolver.task_id == 0:
    root_logdir = os.path.join(os.curdir, "my_mnist_multiworker_logs")
    run_id = time.strftime("run_%Y_%m_%d-%H_%M_%S")
    run_dir = os.path.join(root_logdir, run_id)
    callbacks = [
        keras.callbacks.TensorBoard(run_dir),
        keras.callbacks.ModelCheckpoint("my_mnist_multiworker_model.h5",
                                        save_best_only=True),
    ]
else:
    callbacks = []

# Load and prepare the MNIST dataset
(X_train_full, y_train_full), (X_test, y_test) = keras.datasets.mnist.load_data()
X_train_full = X_train_full[..., np.newaxis] / 255.
X_valid, X_train = X_train_full[:5000], X_train_full[5000:]
y_valid, y_train = y_train_full[:5000], y_train_full[5000:]

with distribution.scope():
    model = keras.models.Sequential([
        keras.layers.Conv2D(filters=64, kernel_size=7, activation="relu",
                            padding="same", input_shape=[28, 28, 1]),
        keras.layers.MaxPooling2D(pool_size=2),
        keras.layers.Conv2D(filters=128, kernel_size=3, activation="relu",
                            padding="same"),
        keras.layers.Conv2D(filters=128, kernel_size=3, activation="relu",
                            padding="same"),
        keras.layers.MaxPooling2D(pool_size=2),
        keras.layers.Flatten(),
        keras.layers.Dense(units=64, activation='relu'),
        keras.layers.Dropout(0.5),
        keras.layers.Dense(units=10, activation='softmax'),
    ])
    model.compile(loss="sparse_categorical_crossentropy",
                  optimizer=keras.optimizers.SGD(learning_rate=1e-2),
                  metrics=["accuracy"])

model.fit(X_train, y_train, validation_data=(X_valid, y_valid),
          epochs=10, callbacks=callbacks)

Overwriting my_mnist_multiworker_task.py


Dalam aplikasi dunia nyata, biasanya hanya ada satu pekerja per mesin, tetapi dalam contoh ini, kami menjalankan kedua pekerja pada mesin yang sama, sehingga keduanya akan mencoba menggunakan semua RAM GPU yang tersedia (jika mesin ini memiliki GPU), dan ini kemungkinan akan menyebabkan kesalahan Kehabisan Memori (OOM). Untuk menghindari hal ini, kami dapat menggunakan variabel lingkungan `CUDA_VISIBLE_DEVICES` untuk menetapkan GPU yang berbeda untuk setiap pekerja. Atau, kami dapat menonaktifkan dukungan GPU, seperti ini:

In [None]:
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"

In [None]:
import subprocess

cluster_spec = {"worker": ["127.0.0.1:9901", "127.0.0.1:9902"]}

for index, worker_address in enumerate(cluster_spec["worker"]):
    os.environ["TF_CONFIG"] = json.dumps({
        "cluster": cluster_spec,
        "task": {"type": "worker", "index": index}
    })
    subprocess.Popen("python my_mnist_multiworker_task.py", shell=True)

In [None]:
from tensorflow import keras

model = keras.models.load_model("my_mnist_multiworker_model.h5")
Y_pred = model.predict(X_new)
np.argmax(Y_pred, axis=-1)

array([7, 2, 1])

# 📘 Chapter 19: Training and Deploying TensorFlow Models at Scale



##  Tujuan Bab

* Memahami teknik dan arsitektur untuk melatih model TensorFlow secara skala besar.
* Menguasai praktik terbaik deployment model ML ke produksi dengan efisien dan scalable.
* Memahami integrasi pipeline ML, monitoring, dan continuous training.



## 1. **Challenges of Large-Scale Training**

* Dataset besar → memerlukan **distributed training** dan **data pipeline efisien**.
* Model besar → membutuhkan **resource komputasi tinggi**, GPU/TPU.
* Menghindari overfitting dan memastikan generalisasi.
* Deployment membutuhkan model yang:

  * Fast inference
  * Scalable dan reliable
  * Mudah dipantau dan diperbarui



## 2. **Distributed Training dengan TensorFlow**

### 2.1 Distributed Strategy API

TensorFlow menyediakan beberapa strategi untuk parallel dan distributed training:

| Strategy                      | Keterangan                                      |
| ----------------------------- | ----------------------------------------------- |
| `MirroredStrategy`            | Data parallelism di satu mesin dengan multi-GPU |
| `MultiWorkerMirroredStrategy` | Distributed data parallelism di beberapa mesin  |
| `TPUStrategy`                 | Khusus TPU di Google Cloud                      |



### 2.2 Contoh penggunaan `MirroredStrategy`

```python
strategy = tf.distribute.MirroredStrategy()

with strategy.scope():
    model = create_model()
    model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])
    
model.fit(dataset, epochs=10)
```

* `strategy.scope()` menjamin semua variabel dan operasi model di-replicate ke semua device.



## 3. **Input Pipeline untuk Scale**

### 3.1 Gunakan `tf.data` API

* Pipeline data efisien dengan metode `map()`, `batch()`, `prefetch()`, `shuffle()`.
* Contoh pipeline:

```python
def preprocess(image, label):
    image = tf.image.resize(image, [224, 224])
    image = image / 255.0
    return image, label

dataset = tf.data.TFRecordDataset(filenames)
dataset = dataset.map(preprocess).batch(32).prefetch(tf.data.AUTOTUNE)
```



### 3.2 TFRecord Format

* Format penyimpanan data yang efisien dan mudah di-streaming.
* Cocok untuk dataset besar.



## 4. **Checkpointing dan TensorBoard**

### 4.1 Checkpointing

* Simpan state model secara periodik untuk pemulihan (recovery).
* Contoh:

```python
checkpoint_path = "training_1/cp.ckpt"
checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(filepath=checkpoint_path,
                                                         save_weights_only=True,
                                                         verbose=1)
model.fit(dataset, epochs=10, callbacks=[checkpoint_callback])
```

### 4.2 Monitoring dengan TensorBoard

* Visualisasi loss, accuracy, dan metrics lain.
* Logging menggunakan:

```python
tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir='./logs')
model.fit(dataset, epochs=10, callbacks=[tensorboard_callback])
```



## 5. **Model Deployment**

### 5.1 Export Model

* Ekspor model Keras ke format SavedModel:

```python
model.save("saved_model/my_model")
```

* Format ini portable dan dapat digunakan di berbagai platform.



### 5.2 Serving Model dengan TensorFlow Serving

* TensorFlow Serving adalah server khusus untuk serving model TensorFlow di production.
* Mendukung versioning model dan update tanpa downtime.
* Contoh command:

```bash
tensorflow_model_server --rest_api_port=8501 --model_name=my_model --model_base_path=/path/to/saved_model/my_model/
```



### 5.3 Deployment di Cloud

* Cloud platform seperti Google AI Platform, AWS SageMaker, Azure ML menyediakan managed service untuk training dan deployment.
* Integrasi dengan CI/CD pipeline untuk retraining otomatis.



## 6. **Optimasi Model untuk Deployment**

### 6.1 Model Quantization

* Mengurangi ukuran model dan mempercepat inference.
* Tersedia post-training quantization:

```python
converter = tf.lite.TFLiteConverter.from_saved_model("saved_model/my_model")
converter.optimizations = [tf.lite.Optimize.DEFAULT]
tflite_model = converter.convert()
```

### 6.2 Pruning dan Clustering

* Mengurangi kompleksitas model dengan menghapus koneksi kecil (pruning).
* Clustering bobot untuk kompresi.



## 7. **Continuous Training & Monitoring**

* Automasi retraining model dengan data terbaru.
* Monitoring performa model secara real-time untuk deteksi data drift atau degradasi.



## 8. **Best Practices**

| Aspek                | Praktik Terbaik                                      |
| -------------------- | ---------------------------------------------------- |
| Data Pipeline        | Pipeline paralel, prefetch, cache                    |
| Distributed Training | Gunakan `strategy` API, batch besar                  |
| Checkpointing        | Sering simpan checkpoint dan gunakan untuk recovery  |
| Deployment           | Gunakan SavedModel, TensorFlow Serving, quantization |
| Monitoring           | TensorBoard, logging metrik, alerting                |
| Automation           | Integrasi CI/CD, retraining otomatis                 |



## 📈 **Kesimpulan**

Training dan deployment model TensorFlow skala besar memerlukan kombinasi teknologi dan praktik modern:

* **Distributed training** dan optimasi pipeline data untuk memanfaatkan resource komputasi secara efisien.
* **Checkpointing dan monitoring** untuk menjaga stabilitas dan performa.
* **Deployment yang scalable** menggunakan TensorFlow Serving atau platform cloud.
* **Optimasi model** seperti quantization untuk inference yang cepat dan ringan.
* Pendekatan **continuous training** dan monitoring membantu menjaga performa model dalam produksi.

