Skip to content

Commit

Permalink
[Telemetry] Add Telemetry for Ray Train Utilities (#39363)
Browse files Browse the repository at this point in the history
Signed-off-by: woshiyyya <xiaoyunxuan1998@gmail.com>
  • Loading branch information
woshiyyya committed Sep 7, 2023
1 parent 3e8a1dc commit 449afc9
Show file tree
Hide file tree
Showing 6 changed files with 191 additions and 4 deletions.
8 changes: 8 additions & 0 deletions python/ray/train/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -632,6 +632,14 @@ py_test(
deps = [":train_lib"]
)

py_test(
name = "test_train_usage",
size = "medium",
srcs = ["tests/test_train_usage.py"],
tags = ["team:ml", "exclusive"],
deps = [":train_lib"],
)

py_test(
name = "test_training_iterator",
size = "large",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
LegacyTransformersCheckpoint,
)
from ray.util import PublicAPI
from ray._private.usage.usage_lib import TagKey, record_extra_usage_tag

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -263,6 +264,10 @@ class RayTrainReportCallback(TrainerCallback):
"""

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
record_extra_usage_tag(TagKey.TRAIN_TRANSFORMERS_RAYTRAINREPORTCALLBACK, "1")

def on_save(self, args, state, control, **kwargs):
"""Event called after a checkpoint save."""
with TemporaryDirectory() as tmpdir:
Expand Down Expand Up @@ -331,4 +336,5 @@ def get_eval_dataloader(

trainer.__class__ = RayTransformersTrainer

record_extra_usage_tag(TagKey.TRAIN_TRANSFORMERS_PREPARE_TRAINER, "1")
return trainer
20 changes: 20 additions & 0 deletions python/ray/train/lightning/_lightning_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from ray.air.constants import MODEL_KEY
from ray.data.dataset import DataIterator
from ray.util import PublicAPI
from ray._private.usage.usage_lib import TagKey, record_extra_usage_tag

import logging
import shutil
Expand Down Expand Up @@ -63,6 +64,10 @@ class RayDDPStrategy(DDPStrategy):
https://lightning.ai/docs/pytorch/stable/api/lightning.pytorch.strategies.DDPStrategy.html
"""

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
record_extra_usage_tag(TagKey.TRAIN_LIGHTNING_RAYDDPSTRATEGY, "1")

@property
def root_device(self) -> torch.device:
return get_worker_root_device()
Expand All @@ -83,6 +88,10 @@ class RayFSDPStrategy(FSDPStrategy):
https://lightning.ai/docs/pytorch/stable/api/lightning.pytorch.strategies.FSDPStrategy.html
"""

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
record_extra_usage_tag(TagKey.TRAIN_LIGHTNING_RAYFSDPSTRATEGY, "1")

@property
def root_device(self) -> torch.device:
return get_worker_root_device()
Expand Down Expand Up @@ -122,6 +131,10 @@ class RayDeepSpeedStrategy(DeepSpeedStrategy):
https://lightning.ai/docs/pytorch/stable/api/lightning.pytorch.strategies.DeepSpeedStrategy.html
"""

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
record_extra_usage_tag(TagKey.TRAIN_LIGHTNING_RAYDEEPSPEEDSTRATEGY, "1")

@property
def root_device(self) -> torch.device:
return get_worker_root_device()
Expand All @@ -138,6 +151,10 @@ def distributed_sampler_kwargs(self) -> Dict[str, Any]:
class RayLightningEnvironment(LightningEnvironment):
"""Setup Lightning DDP training environment for Ray cluster."""

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
record_extra_usage_tag(TagKey.TRAIN_LIGHTNING_RAYLIGHTNINGENVIRONMENT, "1")

def world_size(self) -> int:
return train.get_context().get_world_size()

Expand Down Expand Up @@ -188,6 +205,7 @@ def prepare_trainer(trainer: pl.Trainer) -> pl.Trainer:
f"but got {type(cluster_environment)}!"
)

record_extra_usage_tag(TagKey.TRAIN_LIGHTNING_PREPARE_TRAINER, "1")
return trainer


Expand All @@ -203,6 +221,8 @@ def __init__(self) -> None:
if os.path.isdir(self.tmpdir_prefix) and self.local_rank == 0:
shutil.rmtree(self.tmpdir_prefix)

record_extra_usage_tag(TagKey.TRAIN_LIGHTNING_RAYTRAINREPORTCALLBACK, "1")

def on_train_epoch_end(self, trainer, pl_module) -> None:
# Creates a checkpoint dir with fixed name
tmpdir = os.path.join(self.tmpdir_prefix, str(trainer.current_epoch))
Expand Down
138 changes: 138 additions & 0 deletions python/ray/train/tests/test_train_usage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
import pytest
import torch

import ray
from ray.train import ScalingConfig
from ray.train.torch import TorchTrainer


@pytest.fixture
def shutdown_only():
yield None
ray.shutdown()


def run_torch():
from torch.utils.data import DataLoader, TensorDataset
from ray.train.torch import get_device, prepare_model, prepare_data_loader

def train_func():
# Create dummy model and data loader
model = torch.nn.Linear(10, 10)
inputs, targets = torch.randn(128, 10), torch.randn(128, 1)
dataloader = DataLoader(TensorDataset(inputs, targets), batch_size=32)

# Test Torch Utilities
prepare_data_loader(dataloader)
prepare_model(model)
get_device()

trainer = TorchTrainer(
train_func, scaling_config=ScalingConfig(num_workers=2, use_gpu=False)
)
trainer.fit()


def run_lightning():
import pytorch_lightning as pl
from ray.train.lightning import (
RayTrainReportCallback,
RayDDPStrategy,
RayFSDPStrategy,
RayDeepSpeedStrategy,
RayLightningEnvironment,
prepare_trainer,
)

def train_func():
# Test Lighting utilites
strategy = RayFSDPStrategy()
strategy = RayDeepSpeedStrategy()
strategy = RayDDPStrategy()
ray_environment = RayLightningEnvironment()
report_callback = RayTrainReportCallback()

trainer = pl.Trainer(
devices="auto",
accelerator="auto",
strategy=strategy,
plugins=[ray_environment],
callbacks=[report_callback],
)
trainer = prepare_trainer(trainer)

trainer = TorchTrainer(
train_func, scaling_config=ScalingConfig(num_workers=2, use_gpu=False)
)

trainer.fit()


def run_transformers():
from datasets import Dataset
from transformers import Trainer, TrainingArguments
from ray.train.huggingface.transformers import (
prepare_trainer,
RayTrainReportCallback,
)

def train_func():
# Create dummy model and datasets
dataset = Dataset.from_dict({"text": ["text1", "text2"], "label": [0, 1]})
model = torch.nn.Linear(10, 10)

# Test Transformers utilites
training_args = TrainingArguments(output_dir="./results", no_cuda=True)
trainer = Trainer(model=model, args=training_args, train_dataset=dataset)

trainer.add_callback(RayTrainReportCallback())
trainer = prepare_trainer(trainer)

trainer = TorchTrainer(
train_func, scaling_config=ScalingConfig(num_workers=2, use_gpu=False)
)

trainer.fit()


@pytest.mark.parametrize("framework", ["torch", "lightning", "transformers"])
def test_torch_utility_usage_tags(shutdown_only, framework):
from ray._private.usage.usage_lib import TagKey, get_extra_usage_tags_to_report

ctx = ray.init()
gcs_client = ray._raylet.GcsClient(address=ctx.address_info["gcs_address"])

if framework == "torch":
run_torch()
expected_tags = [
TagKey.TRAIN_TORCH_GET_DEVICE,
TagKey.TRAIN_TORCH_PREPARE_MODEL,
TagKey.TRAIN_TORCH_PREPARE_DATALOADER,
]
elif framework == "lightning":
run_lightning()
expected_tags = [
TagKey.TRAIN_LIGHTNING_PREPARE_TRAINER,
TagKey.TRAIN_LIGHTNING_RAYTRAINREPORTCALLBACK,
TagKey.TRAIN_LIGHTNING_RAYDDPSTRATEGY,
TagKey.TRAIN_LIGHTNING_RAYFSDPSTRATEGY,
TagKey.TRAIN_LIGHTNING_RAYDEEPSPEEDSTRATEGY,
TagKey.TRAIN_LIGHTNING_RAYLIGHTNINGENVIRONMENT,
]
elif framework == "transformers":
run_transformers()
expected_tags = [
TagKey.TRAIN_TRANSFORMERS_PREPARE_TRAINER,
TagKey.TRAIN_TRANSFORMERS_RAYTRAINREPORTCALLBACK,
]

result = get_extra_usage_tags_to_report(gcs_client)
assert set(result.keys()).issuperset(
{TagKey.Name(tag).lower() for tag in expected_tags}
)


if __name__ == "__main__":
import sys

sys.exit(pytest.main(["-v", "-x", __file__]))
10 changes: 6 additions & 4 deletions python/ray/train/torch/train_loop_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,20 @@
import random
import types
import collections
import numpy as np
from packaging.version import Version

from typing import Any, Dict, List, Optional, Callable, Union

from ray.train._internal import session
from ray.train._internal.accelerator import Accelerator
from torch.optim import Optimizer
from ray.train._internal.session import get_accelerator, set_accelerator
from ray.util.annotations import PublicAPI, Deprecated

import numpy as np
from ray._private.usage.usage_lib import TagKey, record_extra_usage_tag

import torch
from torch.cuda.amp import autocast, GradScaler
from torch.nn.parallel import DistributedDataParallel
from torch.optim import Optimizer

if Version(torch.__version__) < Version("1.11.0"):
FullyShardedDataParallel = None
Expand Down Expand Up @@ -67,6 +66,7 @@ def get_device() -> Union[torch.device, List[torch.device]]:
"""
from ray.air._internal import torch_utils

record_extra_usage_tag(TagKey.TRAIN_TORCH_GET_DEVICE, "1")
return torch_utils.get_device()


Expand Down Expand Up @@ -104,6 +104,7 @@ def prepare_model(
"Run `pip install 'torch>=1.11.0'` to use FullyShardedDataParallel."
)

record_extra_usage_tag(TagKey.TRAIN_TORCH_PREPARE_MODEL, "1")
return get_accelerator(_TorchAccelerator).prepare_model(
model,
move_to_device=move_to_device,
Expand Down Expand Up @@ -138,6 +139,7 @@ def prepare_data_loader(
regardless of the setting. This configuration will be ignored
if ``move_to_device`` is False.
"""
record_extra_usage_tag(TagKey.TRAIN_TORCH_PREPARE_DATALOADER, "1")
return get_accelerator(_TorchAccelerator).prepare_data_loader(
data_loader,
add_dist_sampler=add_dist_sampler,
Expand Down
13 changes: 13 additions & 0 deletions src/ray/protobuf/usage.proto
Original file line number Diff line number Diff line change
Expand Up @@ -171,4 +171,17 @@ enum TagKey {
// AIR entrypoint
// One of: "Trainer.fit", "Tuner.fit", "tune.run", "tune.run_experiments"
AIR_ENTRYPOINT = 508;

// Train Utilities
TRAIN_TORCH_GET_DEVICE = 509;
TRAIN_TORCH_PREPARE_MODEL = 510;
TRAIN_TORCH_PREPARE_DATALOADER = 511;
TRAIN_LIGHTNING_PREPARE_TRAINER = 512;
TRAIN_LIGHTNING_RAYTRAINREPORTCALLBACK = 513;
TRAIN_LIGHTNING_RAYDDPSTRATEGY = 514;
TRAIN_LIGHTNING_RAYFSDPSTRATEGY = 515;
TRAIN_LIGHTNING_RAYDEEPSPEEDSTRATEGY = 516;
TRAIN_LIGHTNING_RAYLIGHTNINGENVIRONMENT = 517;
TRAIN_TRANSFORMERS_PREPARE_TRAINER = 518;
TRAIN_TRANSFORMERS_RAYTRAINREPORTCALLBACK = 519;
}

0 comments on commit 449afc9

Please sign in to comment.