**Rozdział 19. Wielkoskalowe uczenie i wdrażanie modeli TensorFlow**

_Notatnik ten zawiera przykładowy kod i rozwiązania ćwiczeń opisane w rozdziale 19._

<table align="left">
  <td>
    <a target="_blank" href="https://colab.research.google.com/github/ageron/handson-ml2/blob/master/19_training_and_deploying_at_scale.ipynb"><img src="https://www.tensorflow.org/images/colab_logo_32px.png" />Uruchom w Google Colab (wersja angielska)</a>
  </td>
</table>

# Konfiguracja

Importujmy najpierw kilka popularnych modułów, upewnijmy się, że będą wstawiane wykresy MatplotLib, a także przygotujmy funkcję zapisującą rysunki. Sprawdzimy także, czy jest zainstalowane środowisko Python 3.5 lub nowsze (możliwe, że kod będzie działał w środowisku Python 2.x, zostało ono jednak porzucone, dlatego zalecamy korzystanie ze środowiska Python 3), a także biblioteka Scikit-Learn 0.20 lub nowsza i TensorFlow 2.0 lub nowszy.

In [1]:
# Wymagane środowisko Python ≥3.5
import sys
assert sys.version_info >= (3, 5)

# Wymagana biblioteka Scikit-Learn ≥0.20
import sklearn
assert sklearn.__version__ >= "0.20"

try:
    # %tensorflow_version istnieje jedynie w środowisku Colab.
    %tensorflow_version 2.x
    !echo "deb http://storage.googleapis.com/tensorflow-serving-apt stable tensorflow-model-server tensorflow-model-server-universal" > /etc/apt/sources.list.d/tensorflow-serving.list
    !curl https://storage.googleapis.com/tensorflow-serving-apt/tensorflow-serving.release.pub.gpg | apt-key add -
    !apt update && apt-get install -y tensorflow-model-server
    !pip install -q -U tensorflow-serving-api
    IS_COLAB = True
except Exception:
    IS_COLAB = False

# Wymagany moduł TensorFlow ≥2.0
import tensorflow as tf
from tensorflow import keras
assert tf.__version__ >= "2.0"

if not tf.test.is_gpu_available():
    print("Nie wykryto procesora graficznego. Bez niego sieci splotowe mogą działać bardzo powoli.")
    if IS_COLAB:
        print("Kliknij Runtime > Change runtime i wybierz akcelerator graficzny.")

# Importuje standardowe biblioteki
import numpy as np
import os

# Aby wyniki uzyskiwane w tym notatniku były odtwarzalne
np.random.seed(42)
tf.random.set_seed(42)

# Do rysowania ładnych wykresów
%matplotlib inline
import matplotlib as mpl
import matplotlib.pyplot as plt
mpl.rc('axes', labelsize=14)
mpl.rc('xtick', labelsize=12)
mpl.rc('ytick', labelsize=12)

# Ścieżka zapisywania rysunków
PROJECT_ROOT_DIR = "."
CHAPTER_ID = "R19"
IMAGES_PATH = os.path.join(PROJECT_ROOT_DIR, "rysunki", CHAPTER_ID)
os.makedirs(IMAGES_PATH, exist_ok=True)

def save_fig(fig_id, tight_layout=True, fig_extension="png", resolution=300):
    path = os.path.join(IMAGES_PATH, fig_id + "." + fig_extension)
    print("Zapisywanie rysunku", fig_id)
    if tight_layout:
        plt.tight_layout()
    plt.savefig(path, format=fig_extension, dpi=resolution)

TensorFlow 2.x selected.
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  2943  100  2943    0     0  28028      0 --:--:-- --:--:-- --:--:-- 28298
OK
Ign:1 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Get:2 http://storage.googleapis.com/tensorflow-serving-apt stable InRelease [3,012 B]
Ign:3 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Hit:4 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release
Hit:5 http://archive.ubuntu.com/ubuntu bionic InRelease
Hit:6 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Get:7 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Get:8 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Get:9 https://cloud.r-project.org/bin/linux/ubuntu bion

# Wdrażanie modeli TensorFlow do usługi TensorFlow Serving (TFS)
Skorzystamy z interfejsu REST lub gRPC.

## Zapisywanie/wczytywanie obiektu `SavedModel`

In [2]:
(X_train_full, y_train_full), (X_test, y_test) = keras.datasets.mnist.load_data()
X_train_full = X_train_full[..., np.newaxis].astype(np.float32) / 255.
X_test = X_test[..., np.newaxis].astype(np.float32) / 255.
X_valid, X_train = X_train_full[:5000], X_train_full[5000:]
y_valid, y_train = y_train_full[:5000], y_train_full[5000:]
X_new = X_test[:3]

Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz


In [3]:
np.random.seed(42)
tf.random.set_seed(42)

model = keras.models.Sequential([
    keras.layers.Flatten(input_shape=[28, 28, 1]),
    keras.layers.Dense(100, activation="relu"),
    keras.layers.Dense(10, activation="softmax")
])
model.compile(loss="sparse_categorical_crossentropy",
              optimizer=keras.optimizers.SGD(lr=1e-2),
              metrics=["accuracy"])
model.fit(X_train, y_train, epochs=10, validation_data=(X_valid, y_valid))

Train on 55000 samples, validate on 5000 samples
Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


<tensorflow.python.keras.callbacks.History at 0x7f6f632856a0>

In [4]:
np.round(model.predict(X_new), 2)

array([[0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 1.  , 0.  , 0.  ],
       [0.  , 0.  , 0.99, 0.01, 0.  , 0.  , 0.  , 0.  , 0.  , 0.  ],
       [0.  , 0.97, 0.01, 0.  , 0.  , 0.  , 0.  , 0.01, 0.  , 0.  ]],
      dtype=float32)

In [5]:
model_version = "0001"
model_name = "moj_model_mnist"
model_path = os.path.join(model_name, model_version)
model_path

'moj_model_mnist/0001'

In [0]:
!rm -rf {model_name}

In [9]:
tf.saved_model.save(model, model_path)

Instructions for updating:
If using Keras pass *_constraint arguments to layers.
INFO:tensorflow:Assets written to: moj_model_mnist/0001/assets


In [10]:
for root, dirs, files in os.walk(model_name):
    indent = '    ' * root.count(os.sep)
    print('{}{}/'.format(indent, os.path.basename(root)))
    for filename in files:
        print('{}{}'.format(indent + '    ', filename))

moj_model_mnist/
    0001/
        saved_model.pb
        variables/
            variables.index
            variables.data-00000-of-00002
            variables.data-00001-of-00002
        assets/


In [11]:
!saved_model_cli show --dir {model_path}

The given SavedModel contains the following tag-sets:
serve


In [12]:
!saved_model_cli show --dir {model_path} --tag_set serve

The given SavedModel MetaGraphDef contains SignatureDefs with the following keys:
SignatureDef key: "__saved_model_init_op"
SignatureDef key: "serving_default"


In [13]:
!saved_model_cli show --dir {model_path} --tag_set serve \
                      --signature_def serving_default

The given SavedModel SignatureDef contains the following input(s):
  inputs['flatten_input'] tensor_info:
      dtype: DT_FLOAT
      shape: (-1, 28, 28, 1)
      name: serving_default_flatten_input:0
The given SavedModel SignatureDef contains the following output(s):
  outputs['dense_1'] tensor_info:
      dtype: DT_FLOAT
      shape: (-1, 10)
      name: StatefulPartitionedCall:0
Method name is: tensorflow/serving/predict


In [14]:
!saved_model_cli show --dir {model_path} --all


MetaGraphDef with tag-set: 'serve' contains the following SignatureDefs:

signature_def['__saved_model_init_op']:
  The given SavedModel SignatureDef contains the following input(s):
  The given SavedModel SignatureDef contains the following output(s):
    outputs['__saved_model_init_op'] tensor_info:
        dtype: DT_INVALID
        shape: unknown_rank
        name: NoOp
  Method name is: 

signature_def['serving_default']:
  The given SavedModel SignatureDef contains the following input(s):
    inputs['flatten_input'] tensor_info:
        dtype: DT_FLOAT
        shape: (-1, 28, 28, 1)
        name: serving_default_flatten_input:0
  The given SavedModel SignatureDef contains the following output(s):
    outputs['dense_1'] tensor_info:
        dtype: DT_FLOAT
        shape: (-1, 10)
        name: StatefulPartitionedCall:0
  Method name is: tensorflow/serving/predict
Instructions for updating:
If using Keras pass *_constraint arguments to layers.

Defined Functions:
  Function Name: '

Wprowadźmy nowe przykłady do pliku `npy`, dzięki czemu będziemy mogli z łatwością przekazywać je do modelu:

In [0]:
np.save("moje_testy_mnist.npy", X_new)

In [16]:
input_name = model.input_names[0]
input_name

'flatten_input'

Skorzystajmy teraz z narzędzia `saved_model_cli`, aby uzyskać prognozy dla dopiero co zapisanych przykładów:

In [17]:
!saved_model_cli run --dir {model_path} --tag_set serve \
                     --signature_def serving_default    \
                     --inputs {input_name}=my_mnist_tests.npy

Traceback (most recent call last):
  File "/tensorflow-2.1.0/python3.6/bin/saved_model_cli", line 8, in <module>
    sys.exit(main())
  File "/tensorflow-2.1.0/python3.6/tensorflow_core/python/tools/saved_model_cli.py", line 990, in main
    args.func(args)
  File "/tensorflow-2.1.0/python3.6/tensorflow_core/python/tools/saved_model_cli.py", line 720, in run
    args.inputs, args.input_exprs, args.input_examples)
  File "/tensorflow-2.1.0/python3.6/tensorflow_core/python/tools/saved_model_cli.py", line 635, in load_inputs_from_input_arg_string
    data = np.load(file_io.FileIO(filename, mode='rb'), allow_pickle=True)
  File "/tensorflow-2.1.0/python3.6/numpy/lib/npyio.py", line 436, in load
    magic = fid.read(N)
  File "/tensorflow-2.1.0/python3.6/tensorflow_core/python/lib/io/file_io.py", line 122, in read
    self._preread_check()
  File "/tensorflow-2.1.0/python3.6/tensorflow_core/python/lib/io/file_io.py", line 84, in _preread_check
    compat.as_bytes(self.__name), 1024 * 512)
t

In [18]:
np.round([[1.1739199e-04, 1.1239604e-07, 6.0210604e-04, 2.0804715e-03, 2.5779348e-06,
           6.4079795e-05, 2.7411186e-08, 9.9669880e-01, 3.9654213e-05, 3.9471846e-04],
          [1.2294615e-03, 2.9207937e-05, 9.8599273e-01, 9.6755642e-03, 8.8930705e-08,
           2.9156188e-04, 1.5831805e-03, 1.1311053e-09, 1.1980456e-03, 1.1113169e-07],
          [6.4066830e-05, 9.6359509e-01, 9.0598064e-03, 2.9872139e-03, 5.9552520e-04,
           3.7478798e-03, 2.5074568e-03, 1.1462728e-02, 5.5553433e-03, 4.2495009e-04]], 2)

array([[0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 1.  , 0.  , 0.  ],
       [0.  , 0.  , 0.99, 0.01, 0.  , 0.  , 0.  , 0.  , 0.  , 0.  ],
       [0.  , 0.96, 0.01, 0.  , 0.  , 0.  , 0.  , 0.01, 0.01, 0.  ]])

## TensorFlow Serving

Zainstaluj [Dockera](https://docs.docker.com/install/), jeśli jeszcze tego nie zrobiłeś. Następnie uruchom polecenie:

```bash
docker pull tensorflow/serving

export ML_PATH=$HOME/um # lub dowolna inna lokalizacja projektu
docker run -it --rm -p 8500:8500 -p 8501:8501 \
   -v "$ML_PATH/moj_model_mnist:/models/moj_model_mnist" \
   -e MODEL_NAME=moj_model_mnist \
   tensorflow/serving
```
Gdy już przestaniesz korzystać z serwera, wyłączysz go za pomocą kombinacji klawiszy Ctrl+C.

Ewentualnie, jeżeli jest zainstalowany `tensorflow_model_server` (tzn. jeśli uruchomiłeś ten notatnik w środowisku Colab), to poniższe trzy komórki uruchomią serwer:

In [0]:
os.environ["MODEL_DIR"] = os.path.split(os.path.abspath(model_path))[0]

In [20]:
%%bash --bg
nohup tensorflow_model_server \
     --rest_api_port=8501 \
     --model_name=my_mnist_model \
     --model_base_path="${MODEL_DIR}" >server.log 2>&1

Starting job # 0 in a separate thread.


In [21]:
!tail server.log

[warn] getaddrinfo: address family for nodename not supported
[evhttp_server.cc : 238] NET_LOG: Entering the event loop ...


In [0]:
import json

input_data_json = json.dumps({
    "signature_name": "serving_default",
    "instances": X_new.tolist(),
})

In [23]:
repr(input_data_json)[:1500] + "..."

'\'{"signature_name": "serving_default", "instances": [[[[0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0]], [[0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0]], [[0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0]], [[0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0]], [[0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0

Wykorzystajmy teraz interfejs REST do uzyskiwania prognoz:

In [0]:
import requests

SERVER_URL = 'http://localhost:8501/v1/models/my_mnist_model:predict'
response = requests.post(SERVER_URL, data=input_data_json)
response.raise_for_status() # raise an exception in case of error
response = response.json()

In [26]:
response.keys()

dict_keys(['predictions'])

In [27]:
y_proba = np.array(response["predictions"])
y_proba.round(2)

array([[0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 1.  , 0.  , 0.  ],
       [0.  , 0.  , 0.99, 0.01, 0.  , 0.  , 0.  , 0.  , 0.  , 0.  ],
       [0.  , 0.97, 0.01, 0.  , 0.  , 0.  , 0.  , 0.01, 0.  , 0.  ]])

### Korzystanie z interfejsu gRPC

In [0]:
from tensorflow_serving.apis.predict_pb2 import PredictRequest

request = PredictRequest()
request.model_spec.name = model_name
request.model_spec.signature_name = "serving_default"
input_name = model.input_names[0]
request.inputs[input_name].CopyFrom(tf.make_tensor_proto(X_new))

In [0]:
import grpc
from tensorflow_serving.apis import prediction_service_pb2_grpc

channel = grpc.insecure_channel('localhost:8500')
predict_service = prediction_service_pb2_grpc.PredictionServiceStub(channel)
response = predict_service.Predict(request, timeout=10.0)

In [0]:
response

outputs {
  key: "dense_4"
  value {
    dtype: DT_FLOAT
    tensor_shape {
      dim {
        size: 3
      }
      dim {
        size: 10
      }
    }
    float_val: 2.0824443708988838e-05
    float_val: 1.4913139168015732e-08
    float_val: 0.0004813199338968843
    float_val: 0.001888890634290874
    float_val: 2.682592992186983e-07
    float_val: 8.666840585647151e-06
    float_val: 1.6853943241024183e-10
    float_val: 0.9975269436836243
    float_val: 3.833709342870861e-05
    float_val: 3.4738284739432856e-05
    float_val: 0.00017358684272039682
    float_val: 0.0002858016814570874
    float_val: 0.9816810488700867
    float_val: 0.0157401692122221
    float_val: 1.1949770339914068e-10
    float_val: 0.00023017563216853887
    float_val: 3.078056761296466e-05
    float_val: 5.393230750883049e-09
    float_val: 0.0018584482604637742
    float_val: 1.8884094288296183e-09
    float_val: 3.397366526769474e-05
    float_val: 0.9835277795791626
    float_val: 0.001533020636998117


Przekształcamy odpowiedź w tensor:

In [0]:
output_name = model.output_names[0]
outputs_proto = response.outputs[output_name]
y_proba = tf.make_ndarray(outputs_proto)
y_proba.round(2)

array([[0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 1.  , 0.  , 0.  ],
       [0.  , 0.  , 0.98, 0.02, 0.  , 0.  , 0.  , 0.  , 0.  , 0.  ],
       [0.  , 0.98, 0.  , 0.  , 0.  , 0.  , 0.  , 0.01, 0.  , 0.  ]],
      dtype=float32)

Lub w tablicę NumPy, jeżeli Twój klient nie zawiera biblioteki TensorFlow:

In [0]:
output_name = model.output_names[0]
outputs_proto = response.outputs[output_name]
shape = [dim.size for dim in outputs_proto.tensor_shape.dim]
y_proba = np.array(outputs_proto.float_val).reshape(shape)
y_proba.round(2)

array([[0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 1.  , 0.  , 0.  ],
       [0.  , 0.  , 0.98, 0.02, 0.  , 0.  , 0.  , 0.  , 0.  , 0.  ],
       [0.  , 0.98, 0.  , 0.  , 0.  , 0.  , 0.  , 0.01, 0.  , 0.  ]])

## Wdrażanie nowej wersji modelu

In [33]:
np.random.seed(42)
tf.random.set_seed(42)

model = keras.models.Sequential([
    keras.layers.Flatten(input_shape=[28, 28, 1]),
    keras.layers.Dense(50, activation="relu"),
    keras.layers.Dense(50, activation="relu"),
    keras.layers.Dense(10, activation="softmax")
])
model.compile(loss="sparse_categorical_crossentropy",
              optimizer=keras.optimizers.SGD(lr=1e-2),
              metrics=["accuracy"])
history = model.fit(X_train, y_train, epochs=10, validation_data=(X_valid, y_valid))

Train on 55000 samples, validate on 5000 samples
Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


In [34]:
model_version = "0002"
model_name = "moj_model_mnist"
model_path = os.path.join(model_name, model_version)
model_path

'moj_model_mnist/0002'

In [35]:
tf.saved_model.save(model, model_path)

INFO:tensorflow:Assets written to: moj_model_mnist/0002/assets


In [36]:
for root, dirs, files in os.walk(model_name):
    indent = '    ' * root.count(os.sep)
    print('{}{}/'.format(indent, os.path.basename(root)))
    for filename in files:
        print('{}{}'.format(indent + '    ', filename))

moj_model_mnist/
    0001/
        saved_model.pb
        variables/
            variables.index
            variables.data-00000-of-00002
            variables.data-00001-of-00002
        assets/
    0002/
        saved_model.pb
        variables/
            variables.index
            variables.data-00000-of-00002
            variables.data-00001-of-00002
        assets/


**Ostrzeżenie**: Może upłynąć trochę czasu, zanim nowy model zostanie wczytany przez TensorFlow Serving.

In [0]:
import requests

SERVER_URL = 'http://localhost:8501/v1/models/my_mnist_model:predict'
            
response = requests.post(SERVER_URL, data=input_data_json)
response.raise_for_status()
response = response.json()

In [38]:
response.keys()

dict_keys(['predictions'])

In [39]:
y_proba = np.array(response["predictions"])
y_proba.round(2)

array([[0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 1.  , 0.  , 0.  ],
       [0.  , 0.  , 0.99, 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  ],
       [0.  , 0.99, 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  ]])

# Wdrażanie modelu do serwisu GCP Google

W książce znajdziesz instrukcje wdrażania modelu do środowiska Google Cloud AI Platform, pobierania klucza prywatnego konta usługi i zapisania go w pliku `moj_klucz_konta_uslugi.json`. Trzeba także zaktualizować `project_id`:

In [0]:
project_id = "onyx-smoke-242003"

In [0]:
import googleapiclient.discovery

os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "moj_klucz_konta_uslugi.json"
model_id = "moj_model_mnist"
model_path = "projects/{}/models/{}".format(project_id, model_id)
model_path += "/versions/v0001/" # jeżeli chcesz uruchomić określoną wersję
ml_resource = googleapiclient.discovery.build("ml", "v1").projects()

In [0]:
def predict(X):
    input_data_json = {"signature_name": "serving_default",
                       "instances": X.tolist()}
    request = ml_resource.predict(name=model_path, body=input_data_json)
    response = request.execute()
    if "error" in response:
        raise RuntimeError(response["error"])
    return np.array([pred[output_name] for pred in response["predictions"]])

In [0]:
Y_probas = predict(X_new)
np.round(Y_probas, 2)

array([[0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 1.  , 0.  , 0.  ],
       [0.  , 0.  , 0.99, 0.01, 0.  , 0.  , 0.  , 0.  , 0.  , 0.  ],
       [0.  , 0.96, 0.01, 0.  , 0.  , 0.  , 0.  , 0.01, 0.01, 0.  ]])

# Korzystanie z kart graficznych

In [0]:
tf.test.is_gpu_available()

False

In [0]:
tf.test.gpu_device_name()

''

In [0]:
tf.test.is_built_with_cuda()

False

In [0]:
from tensorflow.python.client.device_lib import list_local_devices

devices = list_local_devices()
devices

[name: "/device:CPU:0"
 device_type: "CPU"
 memory_limit: 268435456
 locality {
 }
 incarnation: 11178133101787456811]

# Uczenie rozproszone

In [0]:
keras.backend.clear_session()
tf.random.set_seed(42)
np.random.seed(42)

In [0]:
def create_model():
    return keras.models.Sequential([
        keras.layers.Conv2D(filters=64, kernel_size=7, activation="relu",
                            padding="same", input_shape=[28, 28, 1]),
        keras.layers.MaxPooling2D(pool_size=2),
        keras.layers.Conv2D(filters=128, kernel_size=3, activation="relu",
                            padding="same"), 
        keras.layers.Conv2D(filters=128, kernel_size=3, activation="relu",
                            padding="same"),
        keras.layers.MaxPooling2D(pool_size=2),
        keras.layers.Flatten(),
        keras.layers.Dense(units=64, activation='relu'),
        keras.layers.Dropout(0.5),
        keras.layers.Dense(units=10, activation='softmax'),
    ])

In [0]:
batch_size = 100
model = create_model()
model.compile(loss="sparse_categorical_crossentropy",
              optimizer=keras.optimizers.SGD(lr=1e-2),
              metrics=["accuracy"])
model.fit(X_train, y_train, epochs=10,
          validation_data=(X_valid, y_valid), batch_size=batch_size)

In [0]:
keras.backend.clear_session()
tf.random.set_seed(42)
np.random.seed(42)

distribution = tf.distribute.MirroredStrategy()

# Zmienia domyślny algorytm All-Reduce:
#distribution = tf.distribute.MirroredStrategy(
#    cross_device_ops=tf.distribute.HierarchicalCopyAllReduce())

# Wyznacza listę używanych procesorów graficznych:
#distribution = tf.distribute.MirroredStrategy(devices=["/gpu:0", "/gpu:1"])

# Wprowadza strategię magazynu centralnego:
#distribution = tf.distribute.experimental.CentralStorageStrategy()

#resolver = tf.distribute.cluster_resolver.TPUClusterResolver()
#tf.tpu.experimental.initialize_tpu_system(resolver)
#distribution = tf.distribute.experimental.TPUStrategy(resolver)

with distribution.scope():
    model = create_model()
    model.compile(loss="sparse_categorical_crossentropy",
                  optimizer=keras.optimizers.SGD(lr=1e-2),
                  metrics=["accuracy"])

W0603 15:31:26.178871 140735810999168 cross_device_ops.py:1178] There is non-GPU devices in `tf.distribute.Strategy`, not using nccl allreduce.


In [0]:
batch_size = 100 # must be divisible by the number of workers
model.fit(X_train, y_train, epochs=10,
          validation_data=(X_valid, y_valid), batch_size=batch_size)

In [0]:
model.predict(X_new)

array([[0.09101252, 0.07083996, 0.06410537, 0.11957529, 0.06693752,
        0.05124901, 0.04676544, 0.23180223, 0.13522181, 0.12249089],
       [0.08099081, 0.12387844, 0.14915964, 0.13171668, 0.05875394,
        0.08834281, 0.16267018, 0.06899565, 0.07834874, 0.05714307],
       [0.04303756, 0.2682051 , 0.0909673 , 0.11496522, 0.06084979,
        0.07125981, 0.08520001, 0.08517107, 0.09236596, 0.0879782 ]],
      dtype=float32)

Niestandardowa pętla uczenia:

In [0]:
keras.backend.clear_session()
tf.random.set_seed(42)
np.random.seed(42)

K = keras.backend

distribution = tf.distribute.MirroredStrategy()

with distribution.scope():
    model = create_model()
    optimizer = keras.optimizers.SGD()

with distribution.scope():
    dataset = tf.data.Dataset.from_tensor_slices((X_train, y_train)).repeat().batch(batch_size)
    input_iterator = distribution.make_dataset_iterator(dataset)
    
@tf.function
def train_step():
    def step_fn(inputs):
        X, y = inputs
        with tf.GradientTape() as tape:
            Y_proba = model(X)
            loss = K.sum(keras.losses.sparse_categorical_crossentropy(y, Y_proba)) / batch_size

        grads = tape.gradient(loss, model.trainable_variables)
        optimizer.apply_gradients(zip(grads, model.trainable_variables))
        return loss

    per_replica_losses = distribution.experimental_run(step_fn, input_iterator)
    mean_loss = distribution.reduce(tf.distribute.ReduceOp.SUM,
                                    per_replica_losses, axis=None)
    return mean_loss

n_epochs = 10
with distribution.scope():
    input_iterator.initialize()
    for epoch in range(n_epochs):
        print("Epoka numer {}/{}".format(epoch + 1, n_epochs))
        for iteration in range(len(X_train) // batch_size):
            print("\rFunkcja straty: {:.3f}".format(train_step().numpy()), end="")
        print()

In [0]:
batch_size = 100 # musi być podzielne przez liczbę roboczych grup zadań
model.fit(X_train, y_train, epochs=10,
          validation_data=(X_valid, y_valid), batch_size=batch_size)

## Uczenie za pomocą wielu serwerów

Klaster TensorFlow to grupa równolegle przetwarzanych (zazwyczaj na osobnych komputerach) procesów TensorFlow, komunikujących się ze sobą w celu ukończenia jakiegoś zadania, na przykład wytrenowania lub uruchomienia sieci neuronowej. Każdy proces w klastrze TF nosi nazwę zadania lub serwera TF. Każde zadanie zawiera własny adres IP, port i przynależy do określonego typu (zwanego także rolą albo grupą zadań). Wyróżniamy cztery typy zadań: roboczą grupę zadań (`"worker"`), serwer główny (`"chief"`), serwer parametrów (`"ps"`) i ewaluatora (`"evaluator"`):
* Każda **robocza grupa zadań** przeprowadza obliczenia, zazwyczaj na komputerze wyposażonym w co najmniej jedną kartę graficzną. 
* **Serwer główny** również przeprowadza obliczenia (jak robocza grupa zadań), ale zajmuje się także dodatkowymi czynnościami, takimi jak generowanie komunikatów zdarzeń TensorFlow czy zapisywanie punktów kontrolnych. W każdym klastrze znajduje się jeden serwer główny. Jeżeli nie został wyznaczony żaden serwer główny, rola ta przypada pierwszej roboczej grupie zadań.
* **Serwer parametrów** zajmuje się jedynie wartościami zmiennych i mieści się zazwyczaj na komputerze wyposażonym wyłącznie w jednostkę CPU.
* **Ewaluator** zajmuje się oceną modelu. Zazwyczaj w klastrze występuje jeden ewaluator.

Zbiór zadań tego samego typu często jest nazywany grupą zadań. Na przykład, grupa zadań "worker" stanowi zbiór wszystkich roboczych grup zadań.

Przed uruchomieniem klastra TensorFlow należy go najpierw zdefiniować. Oznacza to określenie adresu IP, portu i typu każdego zadania. Na przykład, poniższa specyfikacja klastra wyznacza klaster trzyzadaniowy (dwie robocze grupy zadań i jeden serwer parametrów). Specyfikacja klastra to słownik zawierający po jednym kluczu na każdą grupę zadań, natomiast jego wartości tworzą listy adresów zadań:

```
{
    "worker": ["moja-robocza-grupa-zadan0.przyklad.com:9876", "moja-robocza-grupa-zadan1.przyklad.com:9876"],
    "ps": ["moj-ps0.przyklad.com:9876"]
}
```

Domyślnie zadania w klastrze mogą się ze sobą komunikować, dlatego skonfiguruj odpowiednio zaporę sieciową tak, aby umożliwiała komunikację pomiędzy tymi komputerami poprzez wyznaczone porty (zazwyczaj ułatwiasz sobie życie, jeśli na wszystkich komputerach będziesz korzystać z tego samego portu).

W momencie uruchamiania zadania musisz podać mu specyfikację klastra, a także wyznaczyć mu typ i indeks (np. indeks zadania jest również nazywany identyfikatorem zadania). Najprostszym sposobem zdefiniowania wszystkich elementów jednocześnie (specyfikacji klastra, a także typu i indeksu bieżącego zadania) jest wyznaczenie zmiennej środowiskowej `TF_CONFIG` przed uruchomieniem programu. Musi mieć ona postać słownika kodowanego w formacie JSON, przechowującego specyfikację klastra (klucz `"cluster"`), a także typ i indeks bieżącego zadania (klucz `"task"`). Na przykład, poniższa zmienna środowiskowa `TF_CONFIG` wykorzystuje zdefiniowany przez nas wcześniej klaster i za jej pomocą wyznaczamy pierwszą roboczą grupę zadań jako zadanie do uruchomienia:

In [0]:
import os
import json

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "worker": ["moja-praca0.przyklad.com:9876", "moja-praca1.przyklad.com:9876"],
        "ps": ["moj-ps0.przyklad.com:9876"]
    },
    "task": {"type": "worker", "index": 0}
})
print("TF_CONFIG='{}'".format(os.environ["TF_CONFIG"]))

TF_CONFIG='{"cluster": {"worker": ["my-work0.example.com:9876", "my-work1.example.com:9876"], "ps": ["my-ps0.example.com:9876"]}, "task": {"type": "worker", "index": 0}}'


Niektóre plarformy (np. Google Cloud ML Engine) automatycznie wyznaczają tę zmienną środowiskową.

Możesz następnie stworzyć prosty skrypt Python uruchamiający zadanie. Można używać tego samego skryptu na wszystkich komputerach, ponieważ będzie wczytywał zmienną `TF_CONFIG`, dzięki której będzie wiadomo, które zadanie ma zostać uruchomione:

In [0]:
import tensorflow as tf

resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()
worker0 = tf.distribute.Server(resolver.cluster_spec(),
                               job_name=resolver.task_type,
                               task_index=resolver.task_id)

Możesz także wyznaczyć specyfikację klastra bezpośrednio w Pythonie zamiast korzystać ze zmiennej środowiskowej:

In [0]:
cluster_spec = tf.train.ClusterSpec({
    "worker": ["127.0.0.1:9901", "127.0.0.1:9902"],
    "ps": ["127.0.0.1:9903"]
})

Możesz następnie uruchomić serwer poprzez przekazanie mu specyfikacji klastra i określenie jego typu oraz indeksu. Zrealizujmy dwa pozostałe zadania (pamiętaj, że zazwyczaj chcemy uruchmiać tylko jedno zadanie na każdym komputerze; teraz uruchamiamy trzy na jednym urządzeniu wyłącznie w celach dydaktycznych):

In [0]:
#worker1 = tf.distribute.Server(cluster_spec, job_name="worker", task_index=1)
ps0 = tf.distribute.Server(cluster_spec, job_name="ps", task_index=0)

In [0]:
os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "worker": ["127.0.0.1:9901", "127.0.0.1:9902"],
        "ps": ["127.0.0.1:9903"]
    },
    "task": {"type": "worker", "index": 1}
})
print(repr(os.environ["TF_CONFIG"]))

'{"cluster": {"worker": ["127.0.0.1:9901", "127.0.0.1:9902"], "ps": ["127.0.0.1:9903"]}, "task": {"type": "worker", "index": 1}}'


In [0]:
distribution = tf.distribute.experimental.MultiWorkerMirroredStrategy()

keras.backend.clear_session()
tf.random.set_seed(42)
np.random.seed(42)

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "worker": ["127.0.0.1:9901", "127.0.0.1:9902"],
        "ps": ["127.0.0.1:9903"]
    },
    "task": {"type": "worker", "index": 1}
})
#CUDA_VISIBLE_DEVICES=0 

with distribution.scope():
    model = create_model()
    model.compile(loss="sparse_categorical_crossentropy",
                  optimizer=keras.optimizers.SGD(lr=1e-2),
                  metrics=["accuracy"])

In [0]:
import tensorflow as tf
from tensorflow import keras
import numpy as np

# Na początku programu (zrestartuj jądro przed uruchomieniem tej komórki)
distribution = tf.distribute.experimental.MultiWorkerMirroredStrategy()

(X_train_full, y_train_full), (X_test, y_test) = keras.datasets.mnist.load_data()
X_train_full = X_train_full[..., np.newaxis] / 255.
X_test = X_test[..., np.newaxis] / 255.
X_valid, X_train = X_train_full[:5000], X_train_full[5000:]
y_valid, y_train = y_train_full[:5000], y_train_full[5000:]
X_new = X_test[:3]

n_workers = 2
batch_size = 32 * n_workers
dataset = tf.data.Dataset.from_tensor_slices((X_train[..., np.newaxis], y_train)).repeat().batch(batch_size)
    
def create_model():
    return keras.models.Sequential([
        keras.layers.Conv2D(filters=64, kernel_size=7, activation="relu",
                            padding="same", input_shape=[28, 28, 1]),
        keras.layers.MaxPooling2D(pool_size=2),
        keras.layers.Conv2D(filters=128, kernel_size=3, activation="relu",
                            padding="same"), 
        keras.layers.Conv2D(filters=128, kernel_size=3, activation="relu",
                            padding="same"),
        keras.layers.MaxPooling2D(pool_size=2),
        keras.layers.Flatten(),
        keras.layers.Dense(units=64, activation='relu'),
        keras.layers.Dropout(0.5),
        keras.layers.Dense(units=10, activation='softmax'),
    ])

with distribution.scope():
    model = create_model()
    model.compile(loss="sparse_categorical_crossentropy",
                  optimizer=keras.optimizers.SGD(lr=1e-2),
                  metrics=["accuracy"])

model.fit(dataset, steps_per_epoch=len(X_train)//batch_size, epochs=10)

In [0]:
# Strojenie hiperparametrów

# Komunikuje się wyłącznie z serwerem ps
config_proto = tf.ConfigProto(device_filters=['/job:ps', '/job:worker/task:%d' % tf_config['task']['index']])
config = tf.estimator.RunConfig(session_config=config_proto)
# domyślne od wersji 1.10

In [0]:
strategy.num_replicas_in_sync