# Tensorflow in Production

| Date | User | Change Type | Remarks |  
| ---- | ---- | ----------- | ------- |
| 14/03/2025   | Martin | Created   | Created notebook for parallelisation snippets | 
| 16/03/2025   | Martin | Updated   | Completed parallelisation and device assignment | 

# Content

* [Using Multiple Executors](#using-multiple-executors)
* [Parallelising TensorFlow](#parallelising-tensorflow)
* [Saving and Restoring TensorFlow Models](#saving-and-restoring-tensorflow-models)

# Using Multiple Executors

Computational graphs in TensorFlow are naturally meant to be computed in parallel. Computational graphs can be split over different processors and processed in different batches

_How to access different processors on the same machine?_

TF will automatically distribute the computation across the multiple devices via a greedy process, but you can also specify which operation should be performed on each device via a _name scope placement_

In [1]:
import tensorflow as tf

2025-03-16 18:16:23.942536: I tensorflow/core/util/port.cc:153] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2025-03-16 18:16:26.467238: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:477] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
E0000 00:00:1742120187.534811    2045 cuda_dnn.cc:8310] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
E0000 00:00:1742120187.803671    2045 cuda_blas.cc:1418] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2025-03-16 18:16:30.249533: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instr

In [2]:
print(f"Num GPUs available, {len(tf.config.list_physical_devices('GPU'))}")

Num GPUs available, 1


If a Tensorflow operation is implemented for CPU and GPU devices, the oepration will be executed by default on the GPU if available

In [3]:
# To find out where placement occurs, set 'log_device_placement'
tf.debugging.set_log_device_placement(True)

a = tf.constant([1.0, 2.0, 3.0, 4.0, 5.0, 6.0], shape=[2, 3], name='a')
b = tf.constant([1.0, 2.0, 3.0, 4.0, 5.0, 6.0], shape=[3, 2], name='b')
c = tf.matmul(a, b)

Executing op _EagerConst in device /job:localhost/replica:0/task:0/device:GPU:0
input: (_Arg): /job:localhost/replica:0/task:0/device:GPU:0
_EagerConst: (_EagerConst): /job:localhost/replica:0/task:0/device:GPU:0
output_RetVal: (_Retval): /job:localhost/replica:0/task:0/device:GPU:0
input: (_Arg): /job:localhost/replica:0/task:0/device:CPU:0
Executing op _EagerConst in device /job:localhost/replica:0/task:0/device:GPU:0
_EagerConst: (_EagerConst): /job:localhost/replica:0/task:0/device:GPU:0
output_RetVal: (_Retval): /job:localhost/replica:0/task:0/device:GPU:0
tensor: (_Arg): /job:localhost/replica:0/task:0/device:GPU:0
shape: (_DeviceArg): /job:localhost/replica:0/task:0/device:CPU:0
Reshape: (Reshape): /job:localhost/replica:0/task:0/device:GPU:0
output_RetVal: (_Retval): /job:localhost/replica:0/task:0/device:GPU:0
Executing op Reshape in device /job:localhost/replica:0/task:0/device:GPU:0
Executing op _EagerConst in device /job:localhost/replica:0/task:0/device:GPU:0
Executing op 

I0000 00:00:1742120386.023824    2045 gpu_device.cc:2022] Created device /job:localhost/replica:0/task:0/device:GPU:0 with 9558 MB memory:  -> device: 0, name: NVIDIA GeForce RTX 4070, pci bus id: 0000:01:00.0, compute capability: 8.9
2025-03-16 18:19:46.058010: I tensorflow/core/common_runtime/placer.cc:162] input: (_Arg): /job:localhost/replica:0/task:0/device:GPU:0
2025-03-16 18:19:46.058039: I tensorflow/core/common_runtime/placer.cc:162] _EagerConst: (_EagerConst): /job:localhost/replica:0/task:0/device:GPU:0
2025-03-16 18:19:46.058043: I tensorflow/core/common_runtime/placer.cc:162] output_RetVal: (_Retval): /job:localhost/replica:0/task:0/device:GPU:0
2025-03-16 18:19:49.739507: I tensorflow/core/common_runtime/placer.cc:162] input: (_Arg): /job:localhost/replica:0/task:0/device:CPU:0
2025-03-16 18:19:49.739535: I tensorflow/core/common_runtime/placer.cc:162] _EagerConst: (_EagerConst): /job:localhost/replica:0/task:0/device:GPU:0
2025-03-16 18:19:49.739538: I tensorflow/core/co

In [4]:
# Or use `device` attribute to determine the name of device that tensor is on
a = tf.constant([1.0, 2.0, 3.0, 4.0, 5.0, 6.0], shape=[2, 3], name='a')
print(a.device)
b = tf.constant([1.0, 2.0, 3.0, 4.0, 5.0, 6.0], shape=[3, 2], name='b')
print(b.device)

Executing op _EagerConst in device /job:localhost/replica:0/task:0/device:GPU:0
Executing op _EagerConst in device /job:localhost/replica:0/task:0/device:GPU:0
Executing op Reshape in device /job:localhost/replica:0/task:0/device:GPU:0
/job:localhost/replica:0/task:0/device:GPU:0
Executing op _EagerConst in device /job:localhost/replica:0/task:0/device:GPU:0
Executing op _EagerConst in device /job:localhost/replica:0/task:0/device:GPU:0
Executing op Reshape in device /job:localhost/replica:0/task:0/device:GPU:0
/job:localhost/replica:0/task:0/device:GPU:0


## Selecting the device

Select the device to use by creating a device context with `tf.device` function. Each operation executed in the context will use the selected device

In [5]:
tf.debugging.set_log_device_placement(True)
# Specifies that all operations should be run on the CPU
with tf.device('/device:CPU:0'):
  a = tf.constant([1.0, 2.0, 3.0, 4.0, 5.0, 6.0], shape=[2, 3], name='a')
  b = tf.constant([1.0, 2.0, 3.0, 4.0, 5.0, 6.0], shape=[3, 2], name='b')
  c = tf.matmul(a, b)

tensor: (_Arg): /job:localhost/replica:0/task:0/device:CPU:0
shape: (_DeviceArg): /job:localhost/replica:0/task:0/device:CPU:0
Reshape: (Reshape): /job:localhost/replica:0/task:0/device:CPU:0
output_RetVal: (_Retval): /job:localhost/replica:0/task:0/device:CPU:0
Executing op Reshape in device /job:localhost/replica:0/task:0/device:CPU:0
Executing op Reshape in device /job:localhost/replica:0/task:0/device:CPU:0
a: (_Arg): /job:localhost/replica:0/task:0/device:CPU:0
b: (_Arg): /job:localhost/replica:0/task:0/device:CPU:0
MatMul: (MatMul): /job:localhost/replica:0/task:0/device:CPU:0
product_RetVal: (_Retval): /job:localhost/replica:0/task:0/device:CPU:0
a: (_Arg): /job:localhost/replica:0/task:0/device:CPU:0
b: (_Arg): /job:localhost/replica:0/task:0/device:CPU:0
_MklMatMul: (_MklMatMul): /job:localhost/replica:0/task:0/device:CPU:0
product_RetVal: (_Retval): /job:localhost/replica:0/task:0/device:CPU:0
Executing op _MklMatMul in device /job:localhost/replica:0/task:0/device:CPU:0


2025-03-16 18:22:15.574271: I tensorflow/core/common_runtime/placer.cc:162] tensor: (_Arg): /job:localhost/replica:0/task:0/device:CPU:0
2025-03-16 18:22:15.574303: I tensorflow/core/common_runtime/placer.cc:162] shape: (_DeviceArg): /job:localhost/replica:0/task:0/device:CPU:0
2025-03-16 18:22:15.574310: I tensorflow/core/common_runtime/placer.cc:162] Reshape: (Reshape): /job:localhost/replica:0/task:0/device:CPU:0
2025-03-16 18:22:15.574312: I tensorflow/core/common_runtime/placer.cc:162] output_RetVal: (_Retval): /job:localhost/replica:0/task:0/device:CPU:0
2025-03-16 18:22:15.588232: I tensorflow/core/common_runtime/placer.cc:162] a: (_Arg): /job:localhost/replica:0/task:0/device:CPU:0
2025-03-16 18:22:15.588259: I tensorflow/core/common_runtime/placer.cc:162] b: (_Arg): /job:localhost/replica:0/task:0/device:CPU:0
2025-03-16 18:22:15.588264: I tensorflow/core/common_runtime/placer.cc:162] MatMul: (MatMul): /job:localhost/replica:0/task:0/device:CPU:0
2025-03-16 18:22:15.588266: I 

In [6]:
tf.debugging.set_log_device_placement(True)
# Specifies that only instantiate the variables on the CPU, but operation is carried out on GPU
with tf.device('/device:CPU:0'):
    a = tf.constant([1.0, 2.0, 3.0, 4.0, 5.0, 6.0], shape=[2, 3], name='a')
    b = tf.constant([1.0, 2.0, 3.0, 4.0, 5.0, 6.0], shape=[3, 2], name='b')
c = tf.matmul(a, b)

Executing op Reshape in device /job:localhost/replica:0/task:0/device:CPU:0
Executing op Reshape in device /job:localhost/replica:0/task:0/device:CPU:0
Executing op MatMul in device /job:localhost/replica:0/task:0/device:GPU:0


## Limiting GPU memory allocation

Tensorflow never releases GPU memory allocation. Starts with almost all of the GPU memory allocated

Slowly grow to that limit with `tf.config.experimental.set_memory_grow` method option or set the environmental variable `TF_FORCE_GPU_ALLOW_GROWTH` to True

In [7]:
gpu_devices = tf.config.list_physical_devices('GPU')
if gpu_devices:
    try:
        tf.config.experimental.set_memory_growth(gpu_devices[0], True)
    except RuntimeError as e:
        # Memory growth cannot be modified after GPU has been initialized
        print(e)

Physical devices cannot be modified after being initialized


## Using multiple GPUs

Set the placements on multiple devices. Assuming there are 3 devices CPU:0, GPU:0 and GPU:1

In [None]:
# Create 2 virtual GPUs
gpu_devices = tf.config.list_physical_devices('GPU')
tf.debugging.set_log_device_placement(True)
if gpu_devices:
  try:
    tf.config.experimental.set_virtual_device_configuration(
      gpu_devices[0],
      # Create 2 virtual GPUs
      [
        tf.config.experimental.VirtualDeviceConfiguration(memory_limit=1024),
        tf.config.experimental.VirtualDeviceConfiguration(memory_limit=1024)
      ]
    )
  except RuntimeError as e:
    # Memory growht cannot be modified after GPU has been initialised
    print(e)

print("Num GPUs Available: ", len(tf.config.list_logical_devices('GPU')))

# Built-in function here to test if a GPU is available
if tf.test.is_built_with_cuda():
  with tf.device('/cpu:0'):
    a = tf.constant([1.0, 3.0, 5.0], shape=[1, 3])
    b = tf.constant([2.0, 4.0, 6.0], shape=[3, 1])

    with tf.device('/gpu:0'):
      c = tf.matmul(a, b)
      c = tf.reshape(c, [-1])

    with tf.device('/gpu:1'):
      d = tf.matmul(b, a)
      flat_d = tf.reshape(d, [-1])

    combined = tf.multiply(c, flat_d)
  
  print(combined)
    

Virtual devices cannot be modified after being initialized
Num GPUs Available:  1
Executing op Reshape in device /job:localhost/replica:0/task:0/device:CPU:0
Executing op Reshape in device /job:localhost/replica:0/task:0/device:CPU:0
Executing op MatMul in device /job:localhost/replica:0/task:0/device:GPU:0
Executing op _EagerConst in device /job:localhost/replica:0/task:0/device:GPU:0
Executing op Reshape in device /job:localhost/replica:0/task:0/device:GPU:0
Executing op MatMul in device /job:localhost/replica:0/task:0/device:GPU:0
Executing op _EagerConst in device /job:localhost/replica:0/task:0/device:GPU:0
Executing op Reshape in device /job:localhost/replica:0/task:0/device:GPU:0
x: (_Arg): /job:localhost/replica:0/task:0/device:CPU:0
y: (_Arg): /job:localhost/replica:0/task:0/device:CPU:0
Mul: (Mul): /job:localhost/replica:0/task:0/device:CPU:0
z_RetVal: (_Retval): /job:localhost/replica:0/task:0/device:CPU:0
Executing op Mul in device /job:localhost/replica:0/task:0/device:CPU

2025-03-16 18:30:33.518109: I tensorflow/core/common_runtime/placer.cc:162] x: (_Arg): /job:localhost/replica:0/task:0/device:CPU:0
2025-03-16 18:30:33.518134: I tensorflow/core/common_runtime/placer.cc:162] y: (_Arg): /job:localhost/replica:0/task:0/device:CPU:0
2025-03-16 18:30:33.518140: I tensorflow/core/common_runtime/placer.cc:162] Mul: (Mul): /job:localhost/replica:0/task:0/device:CPU:0
2025-03-16 18:30:33.518142: I tensorflow/core/common_runtime/placer.cc:162] z_RetVal: (_Retval): /job:localhost/replica:0/task:0/device:CPU:0


## Naming conventions

Devices used by Tensorflow have different naming conventions

| Device | Device Name |
| ---- | ---- |
| Main CPU   | `/device:CPU:0` |
| Main GPU   | `/GPU:0` |
| Second GPU   | `/job:localhost/replica:0/task:0/device:GPU:1` |
| Third GPU   | `/job:localhost/replica:0/task:0/device:GPU:2` |

CPUs are also considered as a unique processer. All cores are wrapped into the same CPU device i.e Tensorflow uses multiple CPU cores by default

---

# Parallelising TensorFlow

Distribution strategy to speed up the training.

Tensorflow distributed API is used to distribute the training by replicating the model into different nodes and training on different subsets of data.

__Features__

* Supports a hardware platform
* Either synchronous or asynchronous training strategy

In [None]:
import tensorflow as tf
import tensorflow_datasets as tfds

In [None]:
# Create two virtual GPUs
gpu_devices = tf.config.list_physical_devices('GPU')
if gpu_devices:
  try:
    tf.config.experimental.set_virtual_device_configuration(gpu_devices[0],
                                                [tf.config.experimental.VirtualDeviceConfiguration(memory_limit=1024),
                                                tf.config.experimental.VirtualDeviceConfiguration(memory_limit=1024) ])
  except RuntimeError as e:
    # Memory growth cannot be modified after GPU has been initialized
    print(e)

## Data preparation

In [None]:
datasets, info = tfds.load('mnist', with_info=True, as_supervised=True)
mnist_train, mnist_test = datasets['train'], datasets['test']

In [None]:
def normalize_img(image, label):
  """Normalizes images: `uint8` -> `float32`."""
  return tf.cast(image, tf.float32) / 255., label

mnist_train = mnist_train.map(
  normalize_img,
  num_parallel_calls=tf.data.experimental.AUTOTUNE
)
mnist_train = mnist_train.cache()
mnist_train = mnist_train.shuffle(info.splits['train'].num_examples)
mnist_train = mnist_train.prefetch(tf.data.experimental.AUTOTUNE)


mnist_test = mnist_test.map(
  normalize_img,
  num_parallel_calls=tf.data.experimental.AUTOTUNE
)
mnist_test = mnist_test.cache()
mnist_test = mnist_test.prefetch(tf.data.experimental.AUTOTUNE)

## Applying strategy

Replicate the model across all GPUs on the same machine. Each model is trained on different batches of data and synchronous training strategy is applied

In [10]:
mirrored_strategy = tf.distribute.MirroredStrategy()

INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
Executing op _EagerConst in device /job:localhost/replica:0/task:0/device:GPU:0
resource_RetVal: (_Retval): /job:localhost/replica:0/task:0/device:GPU:0
VarHandleOp: (VarHandleOp): /job:localhost/replica:0/task:0/device:GPU:0
Executing op VarHandleOp in device /job:localhost/replica:0/task:0/device:GPU:0
resource: (_Arg): /job:localhost/replica:0/task:0/device:GPU:0
value: (_Arg): /job:localhost/replica:0/task:0/device:GPU:0
AssignVariableOp: (AssignVariableOp): /job:localhost/replica:0/task:0/device:GPU:0
Executing op AssignVariableOp in device /job:localhost/replica:0/task:0/device:GPU:0


2025-03-16 18:45:01.810928: I tensorflow/core/common_runtime/placer.cc:162] resource_RetVal: (_Retval): /job:localhost/replica:0/task:0/device:GPU:0
2025-03-16 18:45:01.810963: I tensorflow/core/common_runtime/placer.cc:162] VarHandleOp: (VarHandleOp): /job:localhost/replica:0/task:0/device:GPU:0
2025-03-16 18:45:01.816902: I tensorflow/core/common_runtime/placer.cc:162] resource: (_Arg): /job:localhost/replica:0/task:0/device:GPU:0
2025-03-16 18:45:01.816928: I tensorflow/core/common_runtime/placer.cc:162] value: (_Arg): /job:localhost/replica:0/task:0/device:GPU:0
2025-03-16 18:45:01.816935: I tensorflow/core/common_runtime/placer.cc:162] AssignVariableOp: (AssignVariableOp): /job:localhost/replica:0/task:0/device:GPU:0


In [11]:
print('Number of devices: {}'.format(mirrored_strategy.num_replicas_in_sync))

Number of devices: 1


In [None]:
BATCH_SIZE_PER_REPLICA = 128 # Each GPU receives 128 datapoints
BATCH_SIZE = BATCH_SIZE_PER_REPLICA * mirrored_strategy.num_replicas_in_sync # Total batch size will its multiple

mnist_train = mnist_train.batch(BATCH_SIZE)
mnist_test = mnist_test.batch(BATCH_SIZE)

Define and compile model using the mirrored strategy

In [None]:
# All vairables created inside the scope are mirrored across ALL replicas
with mirrored_strategy.scope():
  model = tf.keras.Sequential()
  model.add(tf.keras.layers.Flatten(name="FLATTEN"))
  model.add(tf.keras.layers.Dense(units=128 , activation="relu", name="D1"))
  model.add(tf.keras.layers.Dense(units=64 , activation="relu", name="D2"))
  model.add(tf.keras.layers.Dense(units=10, activation="softmax", name="OUTPUT"))
  
  model.compile(
    optimizer="sgd", 
    loss="sparse_categorical_crossentropy",
    metrics=["accuracy"]
  )

In [None]:
model.fit(
  mnist_train,
  epochs=10,
  validation_data=mnist_test
)

## Distributed strategies

* `TPUStrategy()` - TPU strategy is like mirrored strategy but runs on TPUs
* `MultiWorkerMirroredStrategy()` - Multiworker Mirrored strategy is also similar to mirrored strategy but the model is trained across several machines, each with multiple GPUs. Specify the cross-device communication
* `experimental.CentralStorageStrategy()` - Central Storage strategy uses a synchronous mode on one machine with multiple GPUs. Variables are not mirroed but placed on the CPU and operations are replicated to all local GPUs
* `experimental.ParameterServerStrategy()` - Parameter Server strategy is implemented on a cluster of machines. Some machines will act as the worker and others are parameter servers. Workers compute and parameter servers store the variables

📜 __NOTE:__ TensorFlow distributed API works better in Graph mode rather than Eager mode