Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Train] [Tune] Refactor MLflow #20802

Merged
merged 43 commits into from
Dec 22, 2021
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
9437251
wip
amogkam Nov 17, 2021
2720e9c
wip
amogkam Nov 20, 2021
9740782
wip
amogkam Nov 23, 2021
b73503e
wip
amogkam Nov 30, 2021
0331ef6
add file
amogkam Nov 30, 2021
4efb081
cleanup tune integration
amogkam Nov 30, 2021
4cdcb31
finish tune
amogkam Nov 30, 2021
ae64f04
add train
amogkam Nov 30, 2021
41e9b0d
update example
amogkam Nov 30, 2021
0557561
almost done
amogkam Nov 30, 2021
567924c
finish
amogkam Nov 30, 2021
b004dee
formatting
amogkam Nov 30, 2021
d02237a
Merge branch 'master' of https://github.com/ray-project/ray into trai…
amogkam Nov 30, 2021
06de0d9
CI
amogkam Nov 30, 2021
4755286
formatting
amogkam Nov 30, 2021
5950734
update error message
amogkam Nov 30, 2021
1faaea4
address comments
amogkam Dec 4, 2021
0683718
Address comments
amogkam Dec 4, 2021
de719eb
fix datasets+train test
amogkam Dec 4, 2021
e1c19d3
fix docs
amogkam Dec 4, 2021
7ab7814
Merge branch 'master' of https://github.com/ray-project/ray into trai…
amogkam Dec 13, 2021
ac564e9
address comments
amogkam Dec 13, 2021
15e9c21
fix failing tests
amogkam Dec 13, 2021
3cba50b
docs
amogkam Dec 13, 2021
939f3b2
Update .buildkite/pipeline.ml.yml
amogkam Dec 13, 2021
4379615
fix cuj example
amogkam Dec 13, 2021
34c64b7
Merge branch 'train-mlflow' of github.com:amogkam/ray into train-mlflow
amogkam Dec 13, 2021
61bfe2e
update examples page
amogkam Dec 13, 2021
192420e
Merge branch 'master' of https://github.com/ray-project/ray into trai…
amogkam Dec 20, 2021
8880fb1
update docs
amogkam Dec 20, 2021
4caad5b
update user guide
amogkam Dec 20, 2021
d031913
fix test
amogkam Dec 20, 2021
6d35f86
add more steps to the user guide
amogkam Dec 20, 2021
c827cb4
formatting
amogkam Dec 20, 2021
66c0b3c
update logs
amogkam Dec 20, 2021
4f9eb20
Merge branch 'master' of https://github.com/ray-project/ray into trai…
amogkam Dec 20, 2021
d2aa72c
update
amogkam Dec 21, 2021
a5ffbd0
updates
amogkam Dec 21, 2021
fb7325f
fix test
amogkam Dec 21, 2021
95fdc64
bump tensorboard version
amogkam Dec 21, 2021
b045256
fix
amogkam Dec 21, 2021
dd894ce
fix artifact recursion
amogkam Dec 21, 2021
c2f8dab
fix test
amogkam Dec 21, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 3 additions & 2 deletions .buildkite/pipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -715,7 +715,7 @@
- bazel test --config=ci $(./scripts/bazel_export_options) --build_tests_only --test_tag_filters=client_unit_tests,-gpu_only --test_env=RAY_CLIENT_MODE=1 python/ray/util/sgd/...

- label: ":octopus: Tune/SGD/Modin/Dask tests and examples. Python 3.7"
conditions: ["RAY_CI_TUNE_AFFECTED", "RAY_CI_SGD_AFFECTED"]
conditions: ["RAY_CI_TUNE_AFFECTED", "RAY_CI_TRAIN_AFFECTED"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we removing RAY_CI_SGD_AFFECTED here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it was ever needed in the first place.

commands:
- cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/travis/upload_build_info.sh; fi }; trap cleanup EXIT
- TUNE_TESTING=1 PYTHON=3.7 INSTALL_HOROVOD=1 ./ci/travis/install-dependencies.sh
Expand All @@ -725,9 +725,10 @@
- bazel test --config=ci $(./scripts/bazel_export_options) --build_tests_only --test_tag_filters=-client python/ray/util/xgboost/...
- bazel test --config=ci $(./scripts/bazel_export_options) --build_tests_only python/ray/util/horovod/...
- bazel test --config=ci $(./scripts/bazel_export_options) --build_tests_only python/ray/util/ray_lightning/...
- bazel test --config-ci $(./scripts/bazel_export_options) --build_tests_only python/ray/util/ml_utils/...

- label: ":octopus: Ludwig tests and examples. Python 3.7"
conditions: ["RAY_CI_TUNE_AFFECTED", "RAY_CI_SGD_AFFECTED"]
conditions: ["RAY_CI_TUNE_AFFECTED"]
amogkam marked this conversation as resolved.
Show resolved Hide resolved
commands:
- cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/travis/upload_build_info.sh; fi }; trap cleanup EXIT
- SGD_TESTING=1 PYTHON=3.7 INSTALL_LUDWIG=1 INSTALL_HOROVOD=1 ./ci/travis/install-dependencies.sh
Expand Down
7 changes: 7 additions & 0 deletions doc/source/train/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,13 @@ TBXLoggerCallback

.. autoclass:: ray.train.callbacks.TBXLoggerCallback

.. _train-api-mlflow-logger-callback

MLflowLoggerCallback
~~~~~~~~~~~~~~~~~~~~

.. autoclass:: ray.train.callbacks.MLflowLoggerCallback

amogkam marked this conversation as resolved.
Show resolved Hide resolved
Checkpointing
-------------

Expand Down
4 changes: 2 additions & 2 deletions doc/source/train/user_guide.rst
Original file line number Diff line number Diff line change
Expand Up @@ -435,11 +435,11 @@ You can plug all of these into Ray Train with the following interface:
Logging Callbacks
+++++++++++++++++

The following ``TrainingCallback``\s are available and will write to a file within the
:ref:`log directory <train-log-dir>` of each training run.
The following ``TrainingCallback``\s are available and will log the intermediate results of the training run.

1. :ref:`train-api-json-logger-callback`
2. :ref:`train-api-tbx-logger-callback`
3. :ref:`train-api-mlflow-logger-callback`

Custom Callbacks
++++++++++++++++
Expand Down
12 changes: 9 additions & 3 deletions python/ray/train/callbacks/callback.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import abc
from typing import List, Dict
from typing import List, Dict, Optional


class TrainingCallback(metaclass=abc.ABCMeta):
Expand All @@ -16,21 +16,27 @@ def handle_result(self, results: List[Dict], **info):
"""
pass

def start_training(self, logdir: str, **info):
def start_training(self, logdir: str, config: Dict, **info):
"""Called once on training start.

Args:
logdir (str): Path to the file directory where logs
should be persisted.
config (Dict): The config dict passed into ``trainer.run()``.
**info: kwargs dict for forward compatibility.
"""
pass

def finish_training(self, error: bool = False, **info):
def finish_training(self,
error: bool = False,
run_dir: Optional[str] = None,
**info):
"""Called once after training is over.

Args:
error (bool): If True, there was an exception during training.
run_dir (Optional[str]): The path to the directory for this
training run.
amogkam marked this conversation as resolved.
Show resolved Hide resolved
**info: kwargs dict for forward compatibility.
"""
pass
95 changes: 95 additions & 0 deletions python/ray/train/callbacks/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from ray.train.callbacks import TrainingCallback
from ray.train.constants import (RESULT_FILE_JSON, TRAINING_ITERATION,
TIME_TOTAL_S, TIMESTAMP, PID)
from ray.util.ml_utils.mlflow import MLflowLoggerUtil

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -174,6 +175,100 @@ def _validate_worker_to_log(self, worker_to_log) -> int:
return worker_to_log


class MLflowLoggerCallback(TrainingSingleWorkerLoggingCallback):
"""MLflow Logger to automatically log Train results and config to MLflow.

MLflow (https://mlflow.org) Tracking is an open source library for
recording and querying experiments. This Ray Train callback
sends information (config parameters, training results & metrics,
and artifacts) to MLflow for automatic experiment tracking.

Args:
tracking_uri (Optional[str]): The tracking URI for where to manage
experiments and runs. This can either be a local file path or a
remote server. This arg gets passed directly to mlflow
initialization.
registry_uri (Optional[str]): The registry URI that gets passed
directly to mlflow initialization.
experiment_id (Optional[str]): The experiment id of an already
existing experiment. If not
passed in, experiment_name will be used.
experiment_name (Optional[str]): The experiment name to use for this
Train run.
If the experiment with the name already exists with MLflow,
it will be used. If not, a new experiment will be created with
this name.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you document the behavior when these are None?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you elaborate more on what you'd like to see here?

There is information in the description for the arguments on the behavior if None is passed in.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, so I'm not clear from this doc what the behavior is if tracking_uri or registry_uri is None. Taking another look at the implementation and docs I now see that it's mentioned in logdir doc - it would be helpful to include this information in each of these parameter docs as well.

I also can't tell from this doc at all what happens if experiment_name is None (and the corresponding environment variable is not set). From the error message, it seems like at least experiment_name or experiment_id must be set.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, thanks for the explanation! Added it to the docstring.

tags (Optional[Dict]): An optional dictionary of string keys and
values to set as tags on the run
save_artifact (bool): If set to True, automatically save the entire
contents of the Train local_dir as an artifact to the
corresponding run in MlFlow.
logdir (Optional[str]): Path to directory where the results file
should be. If None, will be set by the Trainer. If no tracking
uri or registry uri are passed in, the logdir will be used for
both.
worker_to_log (int): Worker index to log. By default, will log the
worker with index 0.
"""

def __init__(self,
tracking_uri: Optional[str] = None,
registry_uri: Optional[str] = None,
experiment_id: Optional[str] = None,
experiment_name: Optional[str] = None,
tags: Optional[Dict] = None,
save_artifact: bool = False,
logdir: Optional[str] = None,
worker_to_log: int = 0):
super().__init__(logdir=logdir, worker_to_log=worker_to_log)

self.tracking_uri = tracking_uri
self.registry_uri = registry_uri
self.experiment_id = experiment_id
self.experiment_name = experiment_name
self.tags = tags

self.save_artifact = save_artifact
self.mlflow_util = MLflowLoggerUtil()

def start_training(self, logdir: str, config: Dict, **info):
super().start_training(logdir=logdir, config=config, info=info)

tracking_uri = self.tracking_uri if self.tracking_uri is not None \
else \
str(self.logdir)
registry_uri = self.registry_uri if self.registry_uri is not None \
else \
str(self.logdir)
amogkam marked this conversation as resolved.
Show resolved Hide resolved

success = self.mlflow_util.setup_mlflow(
tracking_uri=tracking_uri,
registry_uri=registry_uri,
experiment_id=self.experiment_id,
experiment_name=self.experiment_name,
create_experiment_if_not_exists=True)

if not success:
raise ValueError("No experiment_name or experiment_id passed in, "
"Please "
"set one of these to use the "
"MLflowLoggerCallback.")
amogkam marked this conversation as resolved.
Show resolved Hide resolved

self.mlflow_util.start_run(tags=self.tags, set_active=True)
self.mlflow_util.log_params(params_to_log=config)

def handle_result(self, results: List[Dict], **info):
result = results[self._workers_to_log]

self.mlflow_util.log_metrics(
metrics_to_log=result, step=result[TRAINING_ITERATION])

def finish_training(self, error: bool = False, **info):
if self.save_artifact:
self.mlflow_util.save_artifacts(dir=str(self.logdir))
self.mlflow_util.end_run(status="FAILED" if error else "FINISHED")
amogkam marked this conversation as resolved.
Show resolved Hide resolved


class TBXLoggerCallback(TrainingSingleWorkerLoggingCallback):
"""Logs Train results in TensorboardX format.

Expand Down
18 changes: 7 additions & 11 deletions python/ray/train/examples/mlflow_fashion_mnist_example.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,26 @@
import argparse

import mlflow

from ray.train import Trainer
from ray.train.examples.train_fashion_mnist_example import train_func
from ray.train.callbacks.logging import MLflowLoggerCallback


def main(num_workers=2, use_gpu=False):
mlflow.set_experiment("train_torch_fashion_mnist")

trainer = Trainer(
backend="torch", num_workers=num_workers, use_gpu=use_gpu)
trainer.start()
iterator = trainer.run_iterator(
final_results = trainer.run(
amogkam marked this conversation as resolved.
Show resolved Hide resolved
train_func=train_func,
config={
"lr": 1e-3,
"batch_size": 64,
"epochs": 4
})

for intermediate_result in iterator:
first_worker_result = intermediate_result[0]
mlflow.log_metric("loss", first_worker_result["loss"])
},
callbacks=[
MLflowLoggerCallback(experiment_name="train_fashion_mnist")
])

print("Full losses for rank 0 worker: ", iterator.get_final_results())
print("Full losses for rank 0 worker: ", final_results)


if __name__ == "__main__":
Expand Down
51 changes: 49 additions & 2 deletions python/ray/train/tests/test_callbacks.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from ray.train.callbacks import JsonLoggerCallback, TBXLoggerCallback
from ray.train.backend import BackendConfig, Backend
from ray.train.worker_group import WorkerGroup
from ray.train.callbacks.logging import MLflowLoggerCallback

try:
from tensorflow.python.summary.summary_iterator \
Expand Down Expand Up @@ -136,8 +137,6 @@ def _validate_tbx_result(events_dir):
assert len(results["hello/world"]) == 1


@pytest.mark.skipif(
summary_iterator is None, reason="tensorboard is not installed")
def test_TBX(ray_start_4_cpus, make_temp_dir):
config = TestConfig()

Expand All @@ -159,6 +158,54 @@ def train_func():
_validate_tbx_result(temp_dir)


def test_mlflow(ray_start_4_cpus, make_temp_dir):
config = TestConfig()

params = {"p1": "p1"}

temp_dir = make_temp_dir
num_workers = 4

def train_func(config):
train.report(episode_reward_mean=4)
train.report(episode_reward_mean=5)
train.report(episode_reward_mean=6)
return 1

callback = MLflowLoggerCallback(
experiment_name="test_exp", logdir=temp_dir)
trainer = Trainer(config, num_workers=num_workers)
trainer.start()
trainer.run(train_func, config=params, callbacks=[callback])

from mlflow.tracking import MlflowClient

client = MlflowClient(
tracking_uri=callback.mlflow_util._mlflow.get_tracking_uri())

all_runs = callback.mlflow_util._mlflow.search_runs(experiment_ids=["0"])
assert len(all_runs) == 1
# all_runs is a pandas dataframe.
all_runs = all_runs.to_dict(orient="records")
run_id = all_runs[0]["run_id"]
run = client.get_run(run_id)

assert run.data.params == params
assert "episode_reward_mean" in run.data.metrics and \
run.data.metrics["episode_reward_mean"] == 6.0
assert TRAINING_ITERATION in run.data.metrics and \
run.data.metrics[TRAINING_ITERATION] == 3.0

metric_history = client.get_metric_history(
run_id=run_id, key="episode_reward_mean")

assert len(metric_history) == 3
iterations = [metric.step for metric in metric_history]
assert iterations == [1, 2, 3]
rewards = [metric.value for metric in metric_history]
assert rewards == [4, 5, 6]


if __name__ == "__main__":
import pytest
import sys
Expand Down
8 changes: 6 additions & 2 deletions python/ray/train/trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,9 @@ def run(self,
finished_with_errors = False

for callback in callbacks:
callback.start_training(logdir=self.latest_run_dir)
callback.start_training(
logdir=str(self.latest_run_dir),
config=config if config else {})
amogkam marked this conversation as resolved.
Show resolved Hide resolved

train_func = self._get_train_func(train_func, config)

Expand All @@ -304,7 +306,9 @@ def run(self,
return iterator.get_final_results()
finally:
for callback in callbacks:
callback.finish_training(error=finished_with_errors)
callback.finish_training(
error=finished_with_errors,
run_dir=str(self.latest_run_dir))
amogkam marked this conversation as resolved.
Show resolved Hide resolved

def run_iterator(
self,
Expand Down