Skip to content

Commit

Permalink
[Train] [Tune] Refactor MLflow (#20802)
Browse files Browse the repository at this point in the history
Pulls out Tune's MLflow logging logic to a shared MLflow util.
Adds an MLflow logger callback to Ray Train

Closes #20642
  • Loading branch information
amogkam committed Dec 22, 2021
1 parent 09421a4 commit 57db464
Show file tree
Hide file tree
Showing 23 changed files with 838 additions and 453 deletions.
7 changes: 7 additions & 0 deletions .buildkite/pipeline.ml.yml
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,13 @@
- DATA_PROCESSING_TESTING=1 PYTHON=3.7 ./ci/travis/install-dependencies.sh
- bazel test --config=ci $(./scripts/bazel_export_options) --build_tests_only python/ray/workflow/... python/ray/data/...

- label: ":slot_machine: ML Utils tests"
conditions: ["RAY_CI_ML_UTILS_AFFECTED"]
commands:
- cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/travis/upload_build_info.sh; fi }; trap cleanup EXIT
- TUNE_TESTING=1 ./ci/travis/install-dependencies.sh
- bazel test --config=ci $(./scripts/bazel_export_options) --build_tests_only python/ray/util/ml_utils/...

- label: ":book: Doc tests and examples"
conditions:
["RAY_CI_PYTHON_AFFECTED", "RAY_CI_TUNE_AFFECTED", "RAY_CI_DOC_AFFECTED"]
Expand Down
1 change: 1 addition & 0 deletions ci/travis/determine_tests_to_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ def get_commit_range():
RAY_CI_SGD_AFFECTED = 1
RAY_CI_TUNE_AFFECTED = 1
RAY_CI_RLLIB_AFFECTED = 1
RAY_CI_ML_UTILS_AFFECTED = 1
elif re.match("^(python/ray/)?rllib/", changed_file):
RAY_CI_RLLIB_AFFECTED = 1
RAY_CI_RLLIB_DIRECTLY_AFFECTED = 1
Expand Down
33 changes: 6 additions & 27 deletions doc/examples/datasets_train/datasets_train.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,36 +25,11 @@
from ray import train
from ray.data.aggregate import Mean, Std
from ray.train import Trainer
from ray.train import TrainingCallback
from ray.train.callbacks.logging import MLflowLoggerCallback
from ray.train.callbacks import TBXLoggerCallback
from torch.nn.parallel import DistributedDataParallel


# TODO(amogkam): Upstream this into Ray Train.
class MLflowCallback(TrainingCallback):
def __init__(self, config):
self.config = config

def handle_result(self, results, **info):
# For each result that's being reported by ``train.report()``,
# we get the result from the rank 0 worker (i.e. first worker) and
# report it to MLflow.
rank_zero_results = results[0]
mlflow.log_metrics(rank_zero_results)

# TODO: fix type hint for logdir
def start_training(self, logdir, **info):
mlflow.start_run(run_name=str(logdir.name))
mlflow.log_params(config)

# TODO: Update TrainCallback to provide logdir in finish_training.
self.logdir = logdir

def finish_training(self, error: bool = False, **info):
# Save the Trainer checkpoints as artifacts to mlflow.
mlflow.log_artifacts(self.logdir)


def make_and_upload_dataset(dir_path):

import random
Expand Down Expand Up @@ -641,7 +616,11 @@ def train_func(config):
# and should also create 1 directory per file.
tbx_logdir = "./runs"
os.makedirs(tbx_logdir, exist_ok=True)
callbacks = [TBXLoggerCallback(logdir=tbx_logdir), MLflowCallback(config)]
callbacks = [
TBXLoggerCallback(logdir=tbx_logdir),
MLflowLoggerCallback(
experiment_name="cuj-big-data-training", save_artifact=True)
]

# Remove CPU resource so Datasets can be scheduled.
resources_per_worker = {"CPU": 0, "GPU": 1} if use_gpu else None
Expand Down
7 changes: 7 additions & 0 deletions doc/source/train/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,13 @@ TBXLoggerCallback

.. autoclass:: ray.train.callbacks.TBXLoggerCallback

.. _train-api-mlflow-logger-callback:

MLflowLoggerCallback
~~~~~~~~~~~~~~~~~~~~

.. autoclass:: ray.train.callbacks.MLflowLoggerCallback

Checkpointing
-------------

Expand Down
7 changes: 3 additions & 4 deletions doc/source/train/examples.rst
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,10 @@ Horovod
Simple example for Horovod (with TensorFlow)


Iterator API Examples
---------------------

Logger/Callback Examples
------------------------
* :doc:`/train/examples/mlflow_fashion_mnist_example`:
Example for using the Iterator API for custom MLFlow integration.
Example for logging training to MLflow via the ``MLflowLoggerCallback``

Ray Datasets Integration Examples
---------------------------------
Expand Down
96 changes: 59 additions & 37 deletions doc/source/train/user_guide.rst
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ Log directories are exposed through the following attributes:

Logs will be written by:

1. :ref:`Logging Callbacks <train-logging-callbacks>`
1. :ref:`Callbacks <train-callbacks>`
2. :ref:`Checkpoints <train-checkpointing>`

.. TODO link to Training Run Iterator API as a 3rd option for logging.
Expand All @@ -327,6 +327,11 @@ Logs will be written by:
Logging, Monitoring, and Callbacks
----------------------------------

Ray Train has mechanisms to easily collect intermediate results from the training workers during the training run
and also has a :ref:`Callback interface <train-callbacks>` to perform actions on these intermediate results (such as logging, aggregations, printing, etc.).
You can use either the :ref:`built-in callbacks <train-builtin-callbacks>` that Ray Train provides,
or implement a :ref:`custom callback <train-custom-callbacks>` for your use case.

Reporting intermediate results
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Expand Down Expand Up @@ -393,53 +398,53 @@ Callbacks
~~~~~~~~~

You may want to plug in your training code with your favorite experiment management framework.
Ray Train provides an interface to fetch intermediate results and callbacks to process/log your intermediate results.
Ray Train provides an interface to fetch intermediate results and callbacks to process/log your intermediate results
(the values passed into ``train.report(...)``).

You can plug all of these into Ray Train with the following interface:
Ray Train contains built-in callbacks for popular tracking frameworks, or you can implement your own callback via the ``TrainCallback`` interface.

.. code-block:: python
.. _train-builtin-callbacks:

from ray import train
from ray.train import Trainer, TrainingCallback
from typing import List, Dict
Built-in Callbacks
++++++++++++++++++

class PrintingCallback(TrainingCallback):
def handle_result(self, results: List[Dict], **info):
print(results)
The following ``TrainingCallback``\s are available and will log the intermediate results of the training run.

def train_func():
for i in range(3):
train.report(epoch=i)
1. :ref:`train-api-json-logger-callback`
2. :ref:`train-api-tbx-logger-callback`
3. :ref:`train-api-mlflow-logger-callback`

trainer = Trainer(backend="torch", num_workers=2)
trainer.start()
result = trainer.run(
train_func,
callbacks=[PrintingCallback()]
)
# [{'epoch': 0, '_timestamp': 1630471763, '_time_this_iter_s': 0.0020279884338378906, '_training_iteration': 1}, {'epoch': 0, '_timestamp': 1630471763, '_time_this_iter_s': 0.0014922618865966797, '_training_iteration': 1}]
# [{'epoch': 1, '_timestamp': 1630471763, '_time_this_iter_s': 0.0008401870727539062, '_training_iteration': 2}, {'epoch': 1, '_timestamp': 1630471763, '_time_this_iter_s': 0.0007486343383789062, '_training_iteration': 2}]
# [{'epoch': 2, '_timestamp': 1630471763, '_time_this_iter_s': 0.0014500617980957031, '_training_iteration': 3}, {'epoch': 2, '_timestamp': 1630471763, '_time_this_iter_s': 0.0015292167663574219, '_training_iteration': 3}]
trainer.shutdown()
Example: Logging to MLflow and Tensorboard
++++++++++++++++++++++++++++++++++++++++++

.. Here is a list of callbacks that are supported by Ray Train:
**Step 1: Install the necessary packages**

.. * JsonLoggerCallback
.. * TBXLoggerCallback
.. * WandbCallback
.. * MlflowCallback
.. * CSVCallback
.. code-block:: bash
.. _train-logging-callbacks:
$ pip install mlflow
$ pip install tensorboardX
Logging Callbacks
+++++++++++++++++
**Step 2: Run the following training script**

The following ``TrainingCallback``\s are available and will write to a file within the
:ref:`log directory <train-log-dir>` of each training run.
.. literalinclude:: /../../python/ray/train/examples/mlflow_simple_example.py
:language: python

1. :ref:`train-api-json-logger-callback`
2. :ref:`train-api-tbx-logger-callback`
**Step 3: Visualize the logs**

.. code-block:: bash
# Navigate to the run directory of the trainer.
# For example `cd /home/ray_results/train_2021-09-01_12-00-00/run_001`
$ cd <TRAINER_RUN_DIR>
# View the MLflow UI.
$ mlflow ui
# View the tensorboard UI.
$ tensorboard --logdir .
.. _train-custom-callbacks:

Custom Callbacks
++++++++++++++++
Expand All @@ -454,12 +459,29 @@ A simple example for creating a callback that will print out results:

.. code-block:: python
from ray.train import TrainingCallback
from ray import train
from ray.train import Trainer, TrainingCallback
from typing import List, Dict
class PrintingCallback(TrainingCallback):
def handle_result(self, results: List[Dict], **info):
print(results)
def train_func():
for i in range(3):
train.report(epoch=i)
trainer = Trainer(backend="torch", num_workers=2)
trainer.start()
result = trainer.run(
train_func,
callbacks=[PrintingCallback()]
)
# [{'epoch': 0, '_timestamp': 1630471763, '_time_this_iter_s': 0.0020279884338378906, '_training_iteration': 1}, {'epoch': 0, '_timestamp': 1630471763, '_time_this_iter_s': 0.0014922618865966797, '_training_iteration': 1}]
# [{'epoch': 1, '_timestamp': 1630471763, '_time_this_iter_s': 0.0008401870727539062, '_training_iteration': 2}, {'epoch': 1, '_timestamp': 1630471763, '_time_this_iter_s': 0.0007486343383789062, '_training_iteration': 2}]
# [{'epoch': 2, '_timestamp': 1630471763, '_time_this_iter_s': 0.0014500617980957031, '_training_iteration': 3}, {'epoch': 2, '_timestamp': 1630471763, '_time_this_iter_s': 0.0015292167663574219, '_training_iteration': 3}]
trainer.shutdown()
..
Advanced Customization
Expand Down
9 changes: 9 additions & 0 deletions python/ray/train/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,15 @@ py_test(
args = ["--smoke-test"]
)

py_test(
name = "mlflow_simple_example",
size = "medium",
main = "examples/mlflow_simple_example.py",
srcs = ["examples/mlflow_simple_example.py"],
tags = ["team:ml", "exclusive"],
deps = [":train_lib"],
)

py_test(
name = "tensorflow_quick_start",
size = "medium",
Expand Down
8 changes: 6 additions & 2 deletions python/ray/train/callbacks/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
from ray.train.callbacks.callback import TrainingCallback
from ray.train.callbacks.logging import (JsonLoggerCallback, TBXLoggerCallback)
from ray.train.callbacks.logging import (
JsonLoggerCallback, MLflowLoggerCallback, TBXLoggerCallback)

__all__ = ["TrainingCallback", "JsonLoggerCallback", "TBXLoggerCallback"]
__all__ = [
"TrainingCallback", "JsonLoggerCallback", "MLflowLoggerCallback",
"TBXLoggerCallback"
]
3 changes: 2 additions & 1 deletion python/ray/train/callbacks/callback.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@ def handle_result(self, results: List[Dict], **info):
"""
pass

def start_training(self, logdir: str, **info):
def start_training(self, logdir: str, config: Dict, **info):
"""Called once on training start.
Args:
logdir (str): Path to the file directory where logs
should be persisted.
config (Dict): The config dict passed into ``trainer.run()``.
**info: kwargs dict for forward compatibility.
"""
pass
Expand Down

0 comments on commit 57db464

Please sign in to comment.