In [1]:
!pip install -q tensorflow_datasets

In [2]:
import os
import json
import tensorflow as tf
import tensorflow_datasets as tfds
# import warnings
# warnings.simplefilter("ignore")
# os.environ["PYTHONWARNINGS"] = "ignore"

In [3]:
def build_and_compile_model():
    model = tf.keras.models.Sequential([
        tf.keras.layers.Flatten(input_shape=(28, 28, 1)),
        tf.keras.layers.Dense(128, activation='relu'),
        tf.keras.layers.Dropout(0.2),
        tf.keras.layers.Dense(10)
    ])
    model.compile(loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
                  optimizer=tf.keras.optimizers.Adam(),
                  metrics=['accuracy'])
    return model

@tfds.decode.make_decoder(output_dtype=tf.float32)
def decode_image(example, feature):
    return tf.cast(feature.decode_example(example), dtype=tf.float32) / 255

In [4]:
print("TensorFlow version: ", tf.__version__)

tf_config = os.environ.get('TF_CONFIG', '{}')
print(f"TF_CONFIG [{tf_config}]")
tf_config_json = json.loads(tf_config)
cluster = tf_config_json.get('cluster')
job_name = tf_config_json.get('task', {}).get('type')
task_index = tf_config_json.get('task', {}).get('index')
print(f"cluster=[{cluster}] job_name=[{job_name}] task_index=[{task_index}]")

TensorFlow version:  2.4.1
TF_CONFIG [{}]
cluster=[None] job_name=[None] task_index=[None]


In [5]:
BATCH_SIZE = 64

tb_dir = '/data/logs'
# model_dir = '/app/data/export'
# version = 1
# export_dir = os.path.join(model_dir, str(version))

In [6]:
mnist = tfds.builder('mnist', data_dir='/data/dataset')
mnist.download_and_prepare()

In [7]:
strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.OFF

Instructions for updating:
use distribute.MultiWorkerMirroredStrategy instead


Instructions for updating:
use distribute.MultiWorkerMirroredStrategy instead


INFO:tensorflow:Using MirroredStrategy with devices ('/device:CPU:0',)


INFO:tensorflow:Using MirroredStrategy with devices ('/device:CPU:0',)


INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:CPU:0',), communication = CommunicationImplementation.AUTO


INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:CPU:0',), communication = CommunicationImplementation.AUTO


In [8]:
mnist_train, mnist_test = mnist.as_dataset(
    split=['train', 'test'],
    decoders={'image': decode_image()},
    as_supervised=True
)
train_input_dataset = mnist_train.cache().repeat().shuffle(buffer_size=50000).batch(BATCH_SIZE)
# eval_input_dataset = mnist_test.cache().repeat().batch(BATCH_SIZE)

train_input_dataset = train_input_dataset.with_options(options)

In [9]:
# https://www.tensorflow.org/guide/keras/custom_callback
class MyCallback(tf.keras.callbacks.Callback):
    def on_epoch_end(self, epoch, logs=None):
        keys = list(logs.keys())
        print("End epoch {} of training; got log keys: {}".format(epoch, keys))
        
callbacks = [
    tf.keras.callbacks.TensorBoard(log_dir=tb_dir),
    # MyCallback()
]

In [10]:
print("Training...")

with strategy.scope():
    multi_worker_model = build_and_compile_model()

num_train_examples = mnist.info.splits['train'].num_examples
train_steps = num_train_examples // BATCH_SIZE
train_epochs = 10

history = multi_worker_model.fit(
    train_input_dataset, 
    epochs = train_epochs, 
    steps_per_epoch = train_steps, 
    callbacks = callbacks
)

print("\\ntraining_history:", history.history)

Training...
Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
\ntraining_history: {'loss': [0.3383876085281372, 0.16476000845432281, 0.11984053254127502, 0.10162575542926788, 0.0837550014257431, 0.07131215929985046, 0.06316737085580826, 0.058996181935071945, 0.052086569368839264, 0.04844282567501068], 'accuracy': [0.9027481079101562, 0.9525246620178223, 0.9644643664360046, 0.9689667820930481, 0.9748198986053467, 0.9778715372085571, 0.9799059629440308, 0.9811399579048157, 0.9833577871322632, 0.9842749238014221]}


In [11]:
os.environ.update({
    'S3_ENDPOINT'          : 'minio-service.kubeflow:9000',
    'AWS_ACCESS_KEY_ID'    : 'minio',
    'AWS_SECRET_ACCESS_KEY': 'minio123',
    'S3_USE_HTTPS'         : '0',                     
    'S3_VERIFY_SSL'        : '0'
})  

import uuid
uid = uuid.uuid4().hex[:8]
MODEL_URI=f"s3://model/mnist-{uid}/1"
# MODEL_URI="s3://model/mnist/1"
multi_worker_model.save(MODEL_URI)
print(f"Job [{uid}] Completed ^_____^")

INFO:tensorflow:Assets written to: s3://model/mnist/1/assets


INFO:tensorflow:Assets written to: s3://model/mnist/1/assets
