Skip to content

Commit

Permalink
[air] New train.Checkpoint API: Update release tests (batch 1) (ray…
Browse files Browse the repository at this point in the history
  • Loading branch information
justinvyu authored and vitsai committed Aug 19, 2023
1 parent 4c42e4a commit ca02af7
Show file tree
Hide file tree
Showing 13 changed files with 230 additions and 677 deletions.
2 changes: 1 addition & 1 deletion python/ray/tune/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -815,7 +815,7 @@ py_test(
size = "small",
srcs = ["examples/pbt_memnn_example.py"],
deps = [":tune_lib"],
tags = ["team:ml", "exclusive", "example"],
tags = ["team:ml", "exclusive", "example", "no_new_storage"],
args = ["--smoke-test"]
)

Expand Down
15 changes: 10 additions & 5 deletions python/ray/tune/utils/release_test_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
import numpy as np
import os
import pickle
import tempfile
import time

from ray import tune
from ray import train, tune
from ray.train._checkpoint import Checkpoint
from ray.tune.callback import Callback
from ray._private.test_utils import safe_write_to_results_json

Expand Down Expand Up @@ -65,7 +67,7 @@ def save_checkpoint(self, tmp_checkpoint_dir):
checkpoint_data = np.random.uniform(0, 1, size=self._checkpoint_num_items)
with open(checkpoint_file, "wb") as fp:
pickle.dump(checkpoint_data, fp)
return checkpoint_file
return tmp_checkpoint_dir

def load_checkpoint(self, checkpoint):
pass
Expand All @@ -82,19 +84,22 @@ def function_trainable(config):
checkpoint_num_files = config["checkpoint_num_files"]

for i in range(num_iters):
metrics = {"score": i + score}
if (
checkpoint_iters >= 0
and checkpoint_size_b > 0
and i % checkpoint_iters == 0
):
with tune.checkpoint_dir(step=i) as dir:
with tempfile.TemporaryDirectory() as tmpdir:
for i in range(checkpoint_num_files):
checkpoint_file = os.path.join(dir, f"bogus_{i}.ckpt")
checkpoint_file = os.path.join(tmpdir, f"bogus_{i}.ckpt")
checkpoint_data = np.random.uniform(0, 1, size=checkpoint_num_items)
with open(checkpoint_file, "wb") as fp:
pickle.dump(checkpoint_data, fp)
train.report(metrics, checkpoint=Checkpoint.from_directory(tmpdir))
else:
train.report(metrics)

tune.report(score=i + score)
time.sleep(sleep_time)


Expand Down
17 changes: 12 additions & 5 deletions release/air_tests/air_benchmarks/workloads/pytorch_training_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,23 @@
import time
import json
import os
import tempfile
from typing import Dict

import numpy as np
from torchvision import transforms
from torchvision.models import resnet18
import torch
import torch.nn as nn
import torch.optim as optim

import ray
from ray.train.torch import LegacyTorchCheckpoint
from ray.air import Checkpoint
from ray.data.preprocessors import BatchMapper, Chain, TorchVisionPreprocessor
from ray import train
from ray.train import RunConfig, ScalingConfig
from ray.train._checkpoint import Checkpoint as NewCheckpoint
from ray.train._internal.storage import _use_storage_context
from ray.train.torch import TorchTrainer


Expand Down Expand Up @@ -55,10 +59,13 @@ def train_loop_per_worker(config):
print(f"[{epoch + 1}, {i + 1:5d}] loss: {running_loss / 2000:.3f}")
running_loss = 0.0

train.report(
dict(running_loss=running_loss),
checkpoint=LegacyTorchCheckpoint.from_model(model),
)
checkpoint_cls = NewCheckpoint if _use_storage_context() else Checkpoint
with tempfile.TemporaryDirectory() as tmpdir:
torch.save(model.state_dict(), os.path.join(tmpdir, "model.pt"))
train.report(
dict(running_loss=running_loss),
checkpoint=checkpoint_cls.from_directory(tmpdir),
)


@click.command(help="Run Batch prediction on Pytorch ResNet models.")
Expand Down
26 changes: 15 additions & 11 deletions release/air_tests/frequent_pausing/script.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,30 +15,34 @@
"""

import numpy as np
import os
import pickle
import tempfile

from ray import train
from ray.train import Checkpoint, RunConfig
from ray.train import RunConfig
from ray.train._checkpoint import Checkpoint
from ray.tune.schedulers.trial_scheduler import FIFOScheduler, TrialScheduler
from ray.tune.tune_config import TuneConfig
from ray.tune.tuner import Tuner


def func(config):
starting_epoch = 0
if train.get_checkpoint():
checkpoint_dict = train.get_checkpoint().to_dict()

checkpoint = train.get_checkpoint()
if checkpoint:
with checkpoint.as_directory() as checkpoint_dir:
with open(os.path.join(checkpoint_dir, "ckpt.pkl"), "rb") as f:
checkpoint_dict = pickle.load(f)
checkpoint_epoch = checkpoint_dict["epoch"]
starting_epoch = checkpoint_epoch + 1

for epoch in range(starting_epoch, 1000):
checkpoint = Checkpoint.from_dict(
{
"epoch": epoch,
"large_data": np.zeros(10000000),
}
)
train.report({}, checkpoint=checkpoint)
checkpoint_dict = {"epoch": epoch, "large_data": np.zeros(10000000)}
with tempfile.TemporaryDirectory() as tmpdir:
with open(os.path.join(tmpdir, "ckpt.pkl"), "wb") as f:
pickle.dump(checkpoint_dict, f)
train.report({}, checkpoint=Checkpoint.from_directory(tmpdir))


class FrequentPausesScheduler(FIFOScheduler):
Expand Down
31 changes: 19 additions & 12 deletions release/air_tests/horovod/workloads/horovod_tune_test.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
import os
from pathlib import Path
import tempfile

import numpy as np
import torch
import torch.nn as nn
Expand All @@ -8,12 +12,12 @@

import ray
from ray.train import (
Checkpoint,
CheckpointConfig,
FailureConfig,
RunConfig,
ScalingConfig,
)
from ray.train._checkpoint import Checkpoint
import ray.train.torch
from ray.train.horovod import HorovodTrainer
from ray import train, tune
Expand Down Expand Up @@ -52,10 +56,13 @@ def train_loop_per_worker(config):

checkpoint = train.get_checkpoint()
if checkpoint:
checkpoint_dict = checkpoint.to_dict()
model_state = checkpoint_dict["model_state"]
optimizer_state = checkpoint_dict["optimizer_state"]
epoch = checkpoint_dict["epoch"] + 1
with checkpoint.as_directory() as checkpoint_dir:
checkpoint_dir = Path(checkpoint_dir)
model_state = torch.load(checkpoint_dir / "model.pt", map_location="cpu")
optimizer_state = torch.load(
checkpoint_dir / "optim.pt", map_location="cpu"
)
epoch = torch.load(checkpoint_dir / "extra_state.pt")["epoch"] + 1

net.load_state_dict(model_state)
optimizer.load_state_dict(optimizer_state)
Expand Down Expand Up @@ -111,14 +118,14 @@ def train_loop_per_worker(config):
if config["smoke_test"]:
break

checkpoint = Checkpoint.from_dict(
dict(
model_state=net.state_dict(),
optimizer_state=optimizer.state_dict(),
epoch=epoch,
with tempfile.TemporaryDirectory() as tmpdir:
torch.save(net.state_dict(), os.path.join(tmpdir, "model.pt"))
torch.save(optimizer.state_dict(), os.path.join(tmpdir, "optim.pt"))
torch.save({"epoch": epoch}, os.path.join(tmpdir, "extra_state.pt"))
train.report(
dict(loss=running_loss / epoch_steps),
checkpoint=Checkpoint.from_directory(tmpdir),
)
)
train.report(dict(loss=running_loss / epoch_steps), checkpoint=checkpoint)


if __name__ == "__main__":
Expand Down
22 changes: 13 additions & 9 deletions release/golden_notebook_tests/workloads/torch_tune_serve_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@
import atexit
import json
import os
import tempfile
import time
import subprocess

import ray
from ray.train import ScalingConfig, RunConfig
from ray.train._checkpoint import Checkpoint
from ray.air.util.node import _force_on_current_node
from ray.tune.tune_config import TuneConfig
import requests
Expand All @@ -15,7 +17,7 @@
import torchvision.transforms as transforms
from filelock import FileLock
from ray import serve, tune, train
from ray.train.torch import TorchTrainer, LegacyTorchCheckpoint
from ray.train.torch import TorchTrainer
from ray.tune import Tuner
from torch.utils.data import DataLoader, Subset
from torchvision.datasets import MNIST
Expand Down Expand Up @@ -99,10 +101,9 @@ def training_loop(config):
train_epoch(train_loader, model, criterion, optimizer)
validation_loss = validate_epoch(validation_loader, model, criterion)

train.report(
validation_loss,
checkpoint=LegacyTorchCheckpoint.from_state_dict(model.module.state_dict()),
)
with tempfile.TemporaryDirectory() as tmpdir:
torch.save(model.module.state_dict(), os.path.join(tmpdir, "model.pt"))
train.report(validation_loss, checkpoint=Checkpoint.from_directory(tmpdir))


def train_mnist(test_mode=False, num_workers=1, use_gpu=False):
Expand Down Expand Up @@ -144,12 +145,15 @@ def get_remote_model(remote_model_checkpoint_path):


def get_model(model_checkpoint_path):
checkpoint_dict = LegacyTorchCheckpoint.from_directory(model_checkpoint_path)
model_state = checkpoint_dict.to_dict()["model"]

model = resnet18()
model.conv1 = nn.Conv2d(1, 64, kernel_size=7, stride=1, padding=3, bias=False)
model.load_state_dict(model_state)

checkpoint = Checkpoint(path=model_checkpoint_path)
with checkpoint.as_directory() as checkpoint_dir:
model_state_dict = torch.load(
os.path.join(checkpoint_dir, "model.pt"), map_location="cpu"
)
model.load_state_dict(model_state_dict)

return model

Expand Down
7 changes: 6 additions & 1 deletion release/jobs_tests/workloads/jobs_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from ray.dashboard.modules.job.common import JobStatus

from ray.job_submission import JobSubmissionClient
from ray.train.constants import RAY_AIR_NEW_PERSISTENCE_MODE


def wait_until_finish(
Expand Down Expand Up @@ -58,7 +59,11 @@ def wait_until_finish(
client = JobSubmissionClient(address)
job_id = client.submit_job(
entrypoint="python run_simple_tune_job.py",
runtime_env={"pip": ["ray[tune]"], "working_dir": args.working_dir},
runtime_env={
"pip": ["ray[tune]"],
"working_dir": args.working_dir,
"env_vars": {RAY_AIR_NEW_PERSISTENCE_MODE: "1"},
},
)
timeout_s = 10 * 60
status = wait_until_finish(client=client, job_id=job_id, timeout_s=timeout_s)
Expand Down
4 changes: 2 additions & 2 deletions release/jobs_tests/workloads/run_simple_tune_job.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# From https://docs.ray.io/en/latest/tune/index.html

import ray
from ray import tune
from ray import train, tune


def objective(step, alpha, beta):
Expand All @@ -15,7 +15,7 @@ def training_function(config):
# Iterative training function - can be any arbitrary training procedure.
intermediate_score = objective(step, alpha, beta)
# Feed the score back back to Tune.
tune.report(mean_loss=intermediate_score)
train.report(dict(mean_loss=intermediate_score))


ray.init(address="auto")
Expand Down
Loading

0 comments on commit ca02af7

Please sign in to comment.