Skip to content

Commit

Permalink
[data] Stability & accuracy improvements for Data+Train benchmark (#4…
Browse files Browse the repository at this point in the history
…2027)


    Shuffles input images for read_images_train_4_gpu release test, which fixes the issue with accuracy going to 0.
    Add AWS Error NETWORK_CONNECTION and AWS Error ACCESS_DENIED as an Exception type to retry during reads, since this can be a transient error that is fine upon retry.
    Other small fixes for optional parameters in benchmark file, used for debugging purposes.

Results of sample release test run:

    read_images_train_4_gpu:

Result of case cache-none: {'time': 11964.644112934, 'tput': 429.22158930338344, 'accuracy': 0.4667895757295709, 'extra_metrics': {}}

    read_images_train_16_gpu:

Result of case cache-none: {'time': 5400.357632072, 'tput': 1593.6668981608586, 'accuracy': 0.5293150227295434, 'extra_metrics': {}}

    read_images_train_16_gpu_preserve_order:

Result of case cache-none: {'time': 5566.524269388, 'tput': 1571.1312653719967, 'accuracy': 0.5295374787691078, 'extra_metrics': {}}

(The difference is accuracy is because the 4 worker test only runs for 3 epochs, the 16 worker test runs for 5 epochs, using the entire dataset per epoch.)

---------

Signed-off-by: Andrew Xue <andewzxue@gmail.com>
Signed-off-by: Scott Lee <sjl@anyscale.com>
Co-authored-by: Scott Lee <scottjlee@users.noreply.github.com>
Co-authored-by: Scott Lee <sjl@anyscale.com>
  • Loading branch information
3 people committed Jan 9, 2024
1 parent f18daef commit b87ed2c
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 4 deletions.
18 changes: 16 additions & 2 deletions python/ray/data/_internal/planner/plan_read_op.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,18 @@
MapTransformFn,
)
from ray.data._internal.logical.operators.read_operator import Read
from ray.data._internal.util import _warn_on_high_parallelism
from ray.data._internal.util import _warn_on_high_parallelism, call_with_retry
from ray.data.block import Block
from ray.data.context import DataContext
from ray.data.datasource.datasource import ReadTask

TASK_SIZE_WARN_THRESHOLD_BYTES = 100000

# Transient errors that can occur during longer reads. Trigger retry when these occur.
READ_FILE_RETRY_ON_ERRORS = ["AWS Error NETWORK_CONNECTION", "AWS Error ACCESS_DENIED"]
READ_FILE_MAX_ATTEMPTS = 10
READ_FILE_RETRY_MAX_BACKOFF_SECONDS = 32


# Defensively compute the size of the block as the max size reported by the
# datasource and the actual read task size. This is to guard against issues
Expand Down Expand Up @@ -78,8 +83,17 @@ def get_input_data(target_max_block_size) -> List[RefBundle]:
)

def do_read(blocks: Iterable[ReadTask], _: TaskContext) -> Iterable[Block]:
"""Yield from read tasks, with retry logic upon transient read errors."""
for read_task in blocks:
yield from read_task()
read_fn_name = read_task._read_fn.__name__

yield from call_with_retry(
f=read_task,
match=READ_FILE_RETRY_ON_ERRORS,
description=f"read file {read_fn_name}",
max_attempts=READ_FILE_MAX_ATTEMPTS,
max_backoff_s=READ_FILE_RETRY_MAX_BACKOFF_SECONDS,
)

# Create a MapTransformer for a read operator
transform_fns: List[MapTransformFn] = [
Expand Down
5 changes: 3 additions & 2 deletions release/nightly_tests/dataset/multi_node_train_benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def parse_args():
parser.add_argument(
"--skip-train-model",
default=False,
type=bool,
action="store_true",
help="Whether to skip training a model (i.e. only consume data). "
"Set to True if file_type == 'parquet'.",
)
Expand Down Expand Up @@ -327,7 +327,7 @@ def train_loop_per_worker():
end_t = time.time()

epoch_accuracy_val = None
if run_validation_set:
if run_validation_set and not args.skip_train_model:
print(f"Starting validation set for epoch {epoch+1}")
num_correct_val = 0
num_rows_val = 0
Expand Down Expand Up @@ -554,6 +554,7 @@ def benchmark_code(
ray_dataset = ray.data.read_images(
input_paths,
mode="RGB",
shuffle="files",
partitioning=partitioning,
)

Expand Down

0 comments on commit b87ed2c

Please sign in to comment.