Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes for large scale hyperopt #2083

Merged
merged 7 commits into from
Jun 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion ludwig/backend/ray.py
Original file line number Diff line number Diff line change
Expand Up @@ -720,7 +720,6 @@ def initialize(self):
ray.init(ignore_reinit_error=True)

dask.config.set(scheduler=ray_dask_get)
# TODO(shreya): Untested
# Disable placement groups on dask
dask.config.set(annotations={"ray_remote_args": {"placement_group": None}})

Expand Down
18 changes: 15 additions & 3 deletions ludwig/data/dataset/ray.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import threading
from distutils.version import LooseVersion
from functools import lru_cache
from typing import Any, Dict, Iterator, Union
from typing import Any, Dict, Iterator, Optional, Union

import numpy as np
import pandas as pd
Expand Down Expand Up @@ -72,15 +72,27 @@ def __init__(
# return df
# self.ds = self.ds.map_batches(to_tensors, batch_format="pandas")

def pipeline(self, shuffle=True, fully_executed=True) -> DatasetPipeline:
def pipeline(
self, shuffle: bool = True, fully_executed: bool = True, window_size_bytes: Optional[int] = None
) -> DatasetPipeline:
"""
Args:
shuffle: If true, the entire dataset is shuffled in memory before batching.
fully_executed: If true, force full evaluation of the Ray Dataset by loading all blocks into memory.
window_size_bytes: If not None, windowing is enabled and this parameter specifies the window size in bytes
for the dataset.
"""
if not fully_executed and not _ray112:
raise ValueError(f"Cannot set fully_execute=False in ray {ray.__version__}")

if fully_executed and _ray112:
# set instance state so calls to __len__ will also use the fully_executed version
self.ds = self.ds.fully_executed()

pipe = self.ds.repeat()
if window_size_bytes is None:
pipe = self.ds.repeat()
else:
pipe = self.ds.window(bytes_per_window=window_size_bytes).repeat()
if shuffle:
pipe = pipe.random_shuffle_each_window()
return pipe
Expand Down
19 changes: 12 additions & 7 deletions ludwig/hyperopt/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -448,14 +448,16 @@ def on_eval_end(self, trainer, progress_tracker, save_path):
progress_tracker.tune_checkpoint_num += 1
self.last_steps = progress_tracker.steps
self._checkpoint_progress(trainer, progress_tracker, save_path)
report(progress_tracker)
if not is_using_ray_backend:
report(progress_tracker)

def on_trainer_train_teardown(self, trainer, progress_tracker, save_path, is_coordinator):
if is_coordinator and progress_tracker.steps > self.last_steps:
# Note: Calling tune.report in both on_eval_end() and here can cause multiprocessing issues
# for some ray samplers if not steps have happened since the last eval.
self._checkpoint_progress(trainer, progress_tracker, save_path)
report(progress_tracker)
if not is_using_ray_backend:
report(progress_tracker)

callbacks = hyperopt_dict.get("callbacks") or []
hyperopt_dict["callbacks"] = callbacks + [RayTuneReportCallback()]
Expand All @@ -474,7 +476,7 @@ def on_trainer_train_teardown(self, trainer, progress_tracker, save_path, is_coo
"use_gpu": use_gpu,
"resources_per_worker": {
"CPU": num_cpus,
"GPU": num_gpus,
"GPU": 1 if use_gpu else 0,
},
}
hyperopt_dict["backend"].set_distributed_kwargs(**hvd_kwargs)
Expand All @@ -492,22 +494,25 @@ def _run():
stats.append((train_stats, eval_stats))

sync_info = self._get_sync_client_and_remote_checkpoint_dir(trial_dir)
if is_using_ray_backend and sync_info is not None:
if is_using_ray_backend:
# We have to pull the results to the trial actor
# from worker actors, as the Tune session is running
# only on the trial actor
thread = threading.Thread(target=_run)
thread.daemon = True
thread.start()

sync_client, remote_checkpoint_dir = sync_info
sync_client = None
if sync_info is not None:
sync_client, remote_checkpoint_dir = sync_info

def check_queue():
qsize = ray_queue.qsize()
if qsize:
results = ray_queue.get_nowait_batch(qsize)
sync_client.sync_down(remote_checkpoint_dir, str(trial_dir.absolute()))
sync_client.wait()
if sync_client is not None:
sync_client.sync_down(remote_checkpoint_dir, str(trial_dir.absolute()))
sync_client.wait()
for progress_tracker, save_path in results:
checkpoint(progress_tracker, str(trial_dir.joinpath(Path(save_path))))
report(progress_tracker)
Expand Down