we've used `model.save()` before now to version the model we can create a subdirectory for each model version

In [None]:
from pathlib import Path
import tensorflow as tf
from tensorflow import keras
import numpy as np

# Load the MNIST dataset
(X_train_full, y_train_full), (X_test, y_test) = keras.datasets.mnist.load_data()

# Normalize pixel values (0-255) to the range [0,1]
X_train_full = X_train_full.astype("float32") / 255.0
X_test = X_test.astype("float32") / 255.0

# Reshape for CNN input (adding channel dimension)
X_train_full = X_train_full[..., np.newaxis]
X_test = X_test[..., np.newaxis]

# Split the training set into training and validation sets
X_train, X_valid = X_train_full[:50000], X_train_full[50000:]
y_train, y_valid = y_train_full[:50000], y_train_full[50000:]

# Build a simple CNN model
model = keras.Sequential([
    keras.layers.Conv2D(32, kernel_size=3, activation="relu", input_shape=(28, 28, 1)),
    keras.layers.MaxPooling2D(pool_size=2),
    keras.layers.Conv2D(64, kernel_size=3, activation="relu"),
    keras.layers.MaxPooling2D(pool_size=2),
    keras.layers.Flatten(),
    keras.layers.Dense(128, activation="relu"),
    keras.layers.Dense(10, activation="softmax")  # Output layer for 10 classes
])

# Compile the model
model.compile(optimizer="adam",
              loss="sparse_categorical_crossentropy",
              metrics=["accuracy"])

# Train the model
model.fit(X_train, y_train, epochs=5, validation_data=(X_valid, y_valid))

# Save the trained model
model_name = "my_mnist_model"
model_version = "0001"
model_dir = Path(model_name)
model_dir.mkdir(parents=True, exist_ok=True)  # Create the directory if it doesn't exist

model_path = model_dir / f"{model_version}.keras"
model.save(model_path)

metagraph

In [16]:
# !saved_model_cli show --dir ./my_mnist_model/0001.keras

In [20]:
# !saved_model_cli show --dir ./my_mnist_model/0001.keras --tag_set serve

In [22]:
# !saved_model_cli show --dir ./my_mnist_model/0001.keras --tag_set serve --signature_def serving_default

using Docker locally:

`docker pull tensorflow/serving`

`docker run -p 8501:8501 --name=tf_serving --mount type=bind,source=/Users/mathias/Documents/research2025/MachineLearningAlgorithms/my_mnist_model,target=/0001.keras -e MODEL_NAME=0001 -t tensorflow/serving`

In [24]:
# url = "https://storage.googleapis.com/tensorflow-serving-apt"
# src = "stable tensorflow-model-server tensorflow-model-server-universal"
# !echo 'deb {url} {src}' > /etc/apt/sources.list.d/tensorflow-serving.list
# !curl '{url}/tensorflow-serving.release.pub.gpg' | apt-key add -
# !apt update -q && apt-get install -y tensorflow-model-server
# !pip install -q -U tensorflow-serving-api

In [27]:
import os

os.environ["MODEL_DIR"] = str(model_path.parent.absolute())

Starting the server using the `MODEL_DIR` environment variable

In [28]:
# %%bash --bg
# tensorflow_model_server \
# --port=8500 \
# --rest_api_port=8501 \
# --model_name=my_mnist_model \
# --model_base_path="${MODEL_DIR}" >my_server.log 2>&1

In [30]:
# convert input images from NumPy array to Python list
import json

# X_new = X_test[:3] # pretending we have 3 new digit images to classify
# request_json = json.dumps({
#     "signature_name": "servering_defalt",
#     "instances": X_new.tolist(),
# })

# request_json

Send this request to TF Serving via an HTTP POST request:

In [31]:
import requests

# server_url = "http://localhost:8501/v1/models/my_mnist_model:predict"
# response = requests.post(server_url, data=request_json)
# response.raise_for_status()
# response = response.json()

The corresponding value is the list of predictions. This list is a Python list, so let’s convert it to a NumPy array 

In [32]:
import numpy as np

# y_proba = np.array(response["predictions"])
# y_proba.round(2)

### gRPC 

important when using large amounts of data and latency is important.

Uses Http2 and binary format

gRPC API expects a serialized `PredictRequest` protocol buffer as input, and it outputs a serialized `PredictResponse` protocol buffer. protobufs are part of the tensorflow-serving-api library

In [35]:
# from tensorflow_serving.apis.perdict_pb2 import PredictRequest

# request = PredictRequest()
# request.model_spec.name = model_name
# request.model_spec.signature_name = "serving_default"
# input_name = model.input_names[0]
# request.inputs[input_name].CopyFrom(tf.make_tensor_proto(X_new))

Creating a gRPC communication channel to localhost on TCP port 8500:

In [36]:
# import grpc
# from tensorflow_servering.apis import prediction_service_pb2_grpc

# channel = grpc.insecure_channel('localhost:8500')
# predict_service = prediction_service_pb2_grpc.PredictionServiceStub(channel)
# response = predict_service.Predict(request, timeout=10.0)

covert PredictResponse protocol buffer to tensor

In [37]:
# output_name = model.output_names[0]
# outputs_proto = response.outputs[output_name]
# y_proba = tf.make_ndarray(outputs_proto)

The above allows us to access our TensorFlow model remotely, using either REST or gRPC

In [38]:
model_version = "0002"
model_path = model_dir / f"{model_version}.keras"
model.save(model_path)

using code in a Google CoLab notebook:

In [39]:
# from google.colab import auth

# auth.authenticate_user()

In [40]:
# from pathlib import Path

# def upload_directory(bucket, dirpath):
#   dirpath = Path(dirpath)
#   for filepath in dirpath.glob("**/*"):
#     if filepath.is_file():
#       blob = bucket.blob(filepath.relative_to(dirpath.parent).as_posix())
#       blob.upload_from_filename(filepath)

# upload_directory(bucket, "my_mnist_model")

upload `0001.keras` model to google cloud storage by creating a bucket and enabling Vertex AI API

had to save a .pb model format when uploading container:

In [42]:
# import tensorflow as tf
# from google.cloud import storage

# model = tf.keras.models.load_model(f"gs://{bucket_name}/my_mnist_model/0001.keras")

# saved_model_path = f"gs://{bucket_name}/my_mnist_model"

# tf.saved_model.save(model, saved_model_path) 

then this finds the bucket and uploads the `saved_model.pb` file

In [43]:
# from google.cloud import aiplatform

# server_image = "gcr.io/cloud-aiplatform/prediction/tf2-gpu.2-8:latest"

# aiplatform.init(project=project_id, location=location)
# mnist_model = aiplatform.Model.upload(
#     display_name="mnist",
#     artifact_uri=f"gs://{bucket_name}/my_mnist_model",
#     serving_container_image_uri=server_image,
# )

In [44]:
# response = endpoint.predict(instances=X_new.tolist())

In [None]:
# import numpy as np
# np.round(response.predictions, 2)

In [45]:
# endpoint.undeploy_all() # undeploy all models from the endpoint
# endpoint.delete()

If we have a large number of predictions to make, then instead of calling our prediction service repeatedly, we can ask Vertex AI to run a prediction job for us. This does not require an endpoint, only a model. Create a
file containing one instance per line, each formatted as a JSON value this format is called JSON Lines then pass this file to Vertex AI. So let's create a JSON Lines file in a new directory, then upload this directory to GCS:

In [48]:
# batch_path = Path("my_mnist_batch")
# batch_path.mkdir(exist_ok=True)
# with open(batch_path / "my_mnist_batch.jsonl", "w") as jsonl_file:
#     for image in X_test[:100].tolist():
#         jsonl_file.write(json.dumps(image))
#         jsonl_file.write("\n")

# upload_directory(bucket, batch_path)

use the json line file to run batch operations

In [50]:
# batch_prediction_job = mnist_model.batch_predict(
#     job_display_name="mnist-batch-prediction-job",
#     machine_type="n1-standard-4",
#     starting_replica_count=1,
#     max_replica_count=5,
#     accelerator_type="NVIDIA_TESLA_K80",
#     accelerator_count=1,
#     gcs_source=f"gs://{bucket_name}/{batch_path.name}/my_mnist_batch.jsonl",
#     gcs_destination_prefix=f"gs://{bucket_name}/my_mnist_predictions/",
#     sync=True # set to False if you don't want to wait for completion
# )

iterate through all predictions in output files:

In [51]:
# y_pred = np.argmax(y_pred, axis=1)
# accuracy = np.sum(y_pred == y_test[:100]) / 100

delete the directories in bucked created in GCS bucket (optionally the bucket itself if empty)

In [52]:
# for prefix in ["my_mnist_model/", "my_mnist_batch/", "my_mnist_predictions/"]:
#     blobs = bucket.list_blobs(prefix=prefix)
#     for blob in blobs:
#         blob.delete()

# bucket.delete() # if bucket is empty
# batch_prediction_job.delete()

convert a savedmodel to a flatbuffer and save it as a .tflite

In [53]:
# converter = tf.lite.TFLiteConverter.from_saved_model(str(model_path))
# tflite_model = converter.convert()
# with open("my_converted_savedmodel.tflite", "wb") as f:
#     f.write(tflite_model)

In [54]:
# tf.lite.TFLiteConverter.from_keras_model(model)

JS Code for running tensorflow in browser:

In [55]:
# import "https://cdn.jsdelivr.net/npm/@tensorflow/tfjs@latest";
# import "https://cdn.jsdelivr.net/npm/@tensorflow-models/mobilenet@1.0.0";

# const image = document.getElementById("image");
# mobilenet.load().then(model => {
#     model.classify(image).then(predictions => {
#         predictions.forEach((pred, i) => {
#             let className = pred.className
#             let proba = (pred.probability * 100).toFixed(1)
#             console.log(`${className} : ${proba}%`)
#         })
#     })
# })

In [56]:
physical_gpus = tf.config.list_physical_devices("GPU")
physical_gpus

[]

Training multiple models on multiple GPUs (split up the RAM)

In [58]:
# in one terminal:
# CUDA_DEVICE_ORDER=PCI_BUS_ID CUDA_VISIBLE_DEVICES=0,1 python3 program_1.py

# in another terminal:
# CUDA_DEVICE_ORDER=PCI_BUS_ID CUDA_VISIBLE_DEVICES=3,2 python3 program_2.py

allocate specific amount of GPU RAM:

In [59]:
# for gpu in pyhysical_gpus:
#     tf.config.set_logical_device_configuration(
#         gpu,
#         [tf.config.LogicalDeviceConfiguration(memory_limit=2048)]
#     )

allocate memory only when GPU needs it (must be done immediately after importing TensorFlow)

In [60]:
# for gpu in physical_gpus:
#     tf.config.experimental.set_memory_growth(gpu, True)

Split on two or more logical devices (after importing TensorFlow)

In [61]:
# tf.config.set_logical_device_configuration(
#     physical_gpus[0],
#     [tf.config.LogicalDeviceConfiguration(memory_limit=2048),
#      tf.config.LogicalDeviceConfiguration(memory_limit=2048)]
# )

You generally want to place the data preprocessing operations on the CPU, and place the neural network operations on the GPUs.

In [63]:
a = tf.Variable([1., 2., 3.]) # float32 variable goes to the GPU (in my case CPU on an M1 mac)
a.device

'/job:localhost/replica:0/task:0/device:CPU:0'

In [64]:
b = tf.Variable([1, 2, 3]) # int32 variable goes to the CPU
b.device

'/job:localhost/replica:0/task:0/device:CPU:0'

M1 Mac has a multithreaded kernel so any oepration can be run parallel across multiple cores!

Operations in the CPU's evaluation queue are dispatched to a thread pool called the inter-op thread pool.

a second thread pool called the intra-op thread pool (shared by all multithreaded CPU kernels).

control the number of threads in the inter-op thread pool by calling:
`tf.config.threading.set_intra_op_parallelism_threads()`
This is useful if you do not want TensorFlow to use all the CPU cores or if you want it to be single-threaded.

You could train a model on a single GPU and perform all the preprocessing in parallel on the CPU, using the dataset's prefetch() method

CNNs contain layers that are partially connected to the lower layers so its much easier to distribute chunks across devices efficiently

Deep RNN can be split up eddiciently across multiple GPUs if its split up horizontally (per layer)

Split up the data and train on a different set of data per GPU (follow thje mirrored strat)

There are a few ways you can reduce the effect of stale gradients:
- Reduce the learning rate.
- Drop stale gradients or scale them down.
- Adjust the mini-batch size.
- Start the first few epochs using just one replica (this is called the warmup phase).

Stale gradients tend to be more damaging at the beginning of training, when gradients are typically large and the parameters have not settled into a valley of the cost function yet, so different replicas may push the parameters in quite different directions.

pipeline parallelism, which combines model parallelism and data parallelism: the model is chopped into consecutive parts, called stages, each of which is trained on a different machine. This results in an asynchronous pipeline in which all machines work in parallel with very little idle time. 

In [69]:
# using data parallelism with the mirrored strategy
from tensorflow import keras
from tensorflow.keras import layers
strategy = tf.distribute.MirroredStrategy()

with strategy.scope():
    model = keras.Sequential([
        layers.Flatten(input_shape=(28, 28)),
        layers.Dense(128, activation="relu"),
        layers.Dense(10, activation="softmax")
    ])
    
    model.compile(optimizer="adam", 
                  loss="sparse_categorical_crossentropy", 
                  metrics=["accuracy"])

batch_size = 100
model.fit(X_train, y_train, epochs=10, validation_data=(X_valid, y_valid), batch_size=batch_size)

INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:CPU:0',)
Epoch 1/10
[1m500/500[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 2ms/step - accuracy: 0.8166 - loss: 0.6405 - val_accuracy: 0.9481 - val_loss: 0.1906
Epoch 2/10
[1m500/500[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 2ms/step - accuracy: 0.9487 - loss: 0.1788 - val_accuracy: 0.9613 - val_loss: 0.1400
Epoch 3/10
[1m500/500[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 2ms/step - accuracy: 0.9625 - loss: 0.1286 - val_accuracy: 0.9665 - val_loss: 0.1196
Epoch 4/10
[1m500/500[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 2ms/step - accuracy: 0.9722 - loss: 0.0970 - val_accuracy: 0.9682 - val_loss: 0.1063
Epoch 5/10
[1m500/500[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 2ms/step - accuracy: 0.9774 - loss: 0.0781 - val_accuracy: 0.9702 - val_loss: 0.1005
Epoch 6/10
[1m500/500[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 2ms/step - 

2025-02-17 08:37:54.603644: I tensorflow/core/framework/local_rendezvous.cc:405] Local rendezvous is aborting with status: OUT_OF_RANGE: End of sequence
	 [[{{node MultiDeviceIteratorGetNextFromShard}}]]


[1m500/500[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 2ms/step - accuracy: 0.9846 - loss: 0.0538 - val_accuracy: 0.9740 - val_loss: 0.0866
Epoch 8/10
[1m500/500[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 2ms/step - accuracy: 0.9887 - loss: 0.0437 - val_accuracy: 0.9750 - val_loss: 0.0859
Epoch 9/10
[1m500/500[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 2ms/step - accuracy: 0.9907 - loss: 0.0359 - val_accuracy: 0.9752 - val_loss: 0.0817
Epoch 10/10
[1m500/500[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 2ms/step - accuracy: 0.9919 - loss: 0.0300 - val_accuracy: 0.9748 - val_loss: 0.0875


<keras.src.callbacks.history.History at 0x30b969850>

In [71]:
type(model.weights[0]) # if GPU: tensorflow.python.distribute.values.MirroredVariable

keras.src.backend.Variable

Load a model and run it on all available devices, you must call `tf.keras.models.load_model()` within a distribution context:

In [None]:
with strategy.scope():
    model = tf.keras.models.load_model("my_mirrored_model")

# subset of all available GPU devices
strategy = tf.distribute.MirroredStrategy(devices=["/gpu:0", "/gpu:1"])

# data parallelism with centralized parameters
strategy = tf.distribute.experimental.CentralStorageStrategy()

# TPUStrategy (right after importing TensorFlow)
resolver = tf.distribute.cluster_resolver.TPUClusterResolver()
tf.tpu.experimental.initialize_tpu_system(resolver)
strategy = tf.distribute.experimental.TPUStrategy(resolver)

Vertex AI also includes an AutoML service, which completely takes care of finding the right model architecture and training it for you.

YouTube Channels to checkout:
Yannic Kilcher, Letitia Parcalabescu, and Xander Steenbrugge