In [1]:
from pathlib import Path

import ray
import tensorflow as tf
from keras._tf_keras import keras
from ray.air import RunConfig, ScalingConfig
from ray.air.integrations.keras import ReportCheckpointCallback
from ray.train.tensorflow import TensorflowTrainer

2024-12-04 19:11:25.848039: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:477] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
E0000 00:00:1733310685.865833   29662 cuda_dnn.cc:8310] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
E0000 00:00:1733310685.870802   29662 cuda_blas.cc:1418] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2024-12-04 19:11:25.889124: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [2]:
tf.config.list_physical_devices("GPU")
ray.init(num_cpus=4, num_gpus=1)

2024-12-04 19:11:31,639	INFO worker.py:1812 -- Started a local Ray instance. View the dashboard at [1m[32mhttp://127.0.0.1:8265 [39m[22m


0,1
Python version:,3.12.8
Ray version:,2.40.0
Dashboard:,http://127.0.0.1:8265


In [3]:
RANDOM_SEED = 709

BASE_DIR = Path().resolve()
DATA_DIR = BASE_DIR / "data"
TRAIN_DIR = DATA_DIR / "train"
TEST_DIR = DATA_DIR / "test"
CATEGORIES = ["chihuahua", "muffin"]
img_w, img_h, img_ch = IMAGE_TARGET_SIZE = (224, 224, 3)
BATCH_SIZE = 64
EPOCHS = 3

## Load data

In [5]:
def generate_dataset(batch_size: int) -> tuple[tf.data.Dataset, tf.data.Dataset]:
    train_ds: tf.data.Dataset = keras.utils.image_dataset_from_directory(
        TRAIN_DIR,
        validation_split=0.2,
        subset="training",
        seed=RANDOM_SEED,
        image_size=(img_h, img_w),
        batch_size=batch_size,
    )
    train_ds = train_ds.cache().prefetch(tf.data.AUTOTUNE)

    val_ds: tf.data.Dataset = keras.utils.image_dataset_from_directory(
        TRAIN_DIR,
        validation_split=0.2,
        subset="validation",
        seed=RANDOM_SEED,
        image_size=(img_h, img_w),
        batch_size=batch_size,
    )
    val_ds = val_ds.cache().prefetch(tf.data.AUTOTUNE)

    return train_ds, val_ds

## Model definition

In [6]:
def build_model():
    model = keras.Sequential(
        [
            keras.layers.Input(IMAGE_TARGET_SIZE),
            keras.layers.Rescaling(1 / 255),
            keras.layers.Conv2D(32, 3, activation="relu"),
            keras.layers.Flatten(),
            keras.layers.Dense(128, activation="relu"),
            keras.layers.Dense(1),
        ]
    )
    model.summary()
    return model

In [10]:
def train_func(config: dict):
    per_worker_batch_size = config.get("batch_size", BATCH_SIZE)
    epochs = config.get("epochs", EPOCHS)
    steps_per_epoch = config.get("steps_per_epoch", 70)
    num_workers = 1
    strategy = tf.distribute.MultiWorkerMirroredStrategy()
    global_batch_size = per_worker_batch_size * num_workers

    checkpoint = keras.callbacks.ModelCheckpoint(
        str(BASE_DIR / ".checkpoints" / "muffin.weights.h5"),
        monitor="val_loss",
        verbose=True,
        save_best_only=True,
        save_weights_only=True,
        mode="min",
    )
    tensorboard = keras.callbacks.TensorBoard(
        str(BASE_DIR / ".tensorboard"),
        histogram_freq=5,
        write_images=True,
    )

    train_ds, val_ds = generate_dataset(global_batch_size)

    with strategy.scope():
        learning_rate = config.get("learning_rate", 1e-3)
        sgd = keras.optimizers.SGD(learning_rate)
        loss = keras.losses.BinaryCrossentropy(from_logits=True)
        model = build_model()

        model.compile(
            loss=loss,
            optimizer=sgd,
            metrics=["accuracy"],
        )

    history = model.fit(
        train_ds,
        validation_data=val_ds,
        epochs=epochs,
        steps_per_epoch=steps_per_epoch,
        callbacks=[ReportCheckpointCallback(), checkpoint, tensorboard],
        verbose=True,
    )
    return history.history

In [11]:
def train_tensorflow():
    config = {
        "learning_rate": 1e-3,
        "batch_size": BATCH_SIZE,
        "epochs": EPOCHS,
    }

    trainer = TensorflowTrainer(
        train_loop_per_worker=train_func,
        train_loop_config=config,
        scaling_config=ScalingConfig(num_workers=1, use_gpu=True),
        run_config=RunConfig(storage_path=str(BASE_DIR / ".storage")),
    )
    return trainer.fit()

In [None]:
results = train_tensorflow()
results

2024-12-04 19:13:18,559	INFO tune.py:616 -- [output] This uses the legacy output and progress reporter, as Jupyter notebooks are not supported by the new engine, yet. For more information, please see https://github.com/ray-project/ray/issues/36949


== Status ==
Current time: 2024-12-04 19:13:18 (running for 00:00:00.11)
Using FIFO scheduling algorithm.
Logical resource usage: 1.0/4 CPUs, 1.0/1 GPUs (0.0/1.0 accelerator_type:G)
Result logdir: /tmp/ray/session_2024-12-04_19-11-28_825774_29662/artifacts/2024-12-04_19-13-18/TensorflowTrainer_2024-12-04_19-13-18/driver_artifacts
Number of trials: 1/1 (1 PENDING)


== Status ==
Current time: 2024-12-04 19:13:23 (running for 00:00:05.21)
Using FIFO scheduling algorithm.
Logical resource usage: 1.0/4 CPUs, 1.0/1 GPUs (0.0/1.0 accelerator_type:G)
Result logdir: /tmp/ray/session_2024-12-04_19-11-28_825774_29662/artifacts/2024-12-04_19-13-18/TensorflowTrainer_2024-12-04_19-13-18/driver_artifacts
Number of trials: 1/1 (1 RUNNING)


== Status ==
Current time: 2024-12-04 19:13:28 (running for 00:00:10.23)
Using FIFO scheduling algorithm.
Logical resource usage: 1.0/4 CPUs, 1.0/1 GPUs (0.0/1.0 accelerator_type:G)
Result logdir: /tmp/ray/session_2024-12-04_19-11-28_825774_29662/artifacts/2024-12