Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 102 additions & 28 deletions python/ray/data/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import numpy as np

import ray
import ray.cloudpickle as pickle
from ray.types import ObjectRef
from ray.util.annotations import DeveloperAPI, PublicAPI
from ray.data.block import (
Expand Down Expand Up @@ -2547,22 +2548,22 @@ def repeat(self, times: Optional[int] = None) -> "DatasetPipeline[T]":
to repeat indefinitely.
"""
from ray.data.dataset_pipeline import DatasetPipeline
from ray.data.impl.plan import _rewrite_read_stage

# If optimizations are enabled, rewrite the read stage into a OneToOneStage
# to enable fusion with downstream map stages.
ctx = DatasetContext.get_current()
if self._plan._is_read_stage() and ctx.optimize_fuse_read_stages:
self._plan._in_blocks.clear()
blocks, read_stage = self._plan._rewrite_read_stage()
outer_stats = DatasetStats(stages={}, parent=None)
if self._plan.is_read_stage() and ctx.optimize_fuse_read_stages:
blocks, _ = self._plan._get_source_blocks()
blocks.clear()
blocks, outer_stats, read_stage = _rewrite_read_stage(blocks)
else:
blocks = self._plan.execute()
read_stage = None
outer_stats = self._plan.stats()
read_stage = None
uuid = self._get_uuid()
outer_stats.dataset_uuid = uuid

if times is not None and times < 1:
raise ValueError("`times` must be >= 1, got {}".format(times))
uuid = self._get_uuid()

class Iterator:
def __init__(self, blocks):
Expand Down Expand Up @@ -2660,24 +2661,23 @@ def window(
exclusive with ``blocks_per_window``.
"""
from ray.data.dataset_pipeline import DatasetPipeline
from ray.data.impl.plan import _rewrite_read_stage

if blocks_per_window is not None and bytes_per_window is not None:
raise ValueError("Only one windowing scheme can be specified.")

if blocks_per_window is None:
blocks_per_window = 10

# If optimizations are enabled, rewrite the read stage into a OneToOneStage
# to enable fusion with downstream map stages.
ctx = DatasetContext.get_current()
if self._plan._is_read_stage() and ctx.optimize_fuse_read_stages:
self._plan._in_blocks.clear()
blocks, read_stage = self._plan._rewrite_read_stage()
outer_stats = DatasetStats(stages={}, parent=None)
if self._plan.is_read_stage() and ctx.optimize_fuse_read_stages:
blocks, _ = self._plan._get_source_blocks()
blocks.clear()
blocks, outer_stats, read_stage = _rewrite_read_stage(blocks)
else:
blocks = self._plan.execute()
read_stage = None
outer_stats = self._plan.stats()
read_stage = None

class Iterator:
def __init__(self, splits, epoch):
Expand Down Expand Up @@ -2755,22 +2755,20 @@ def fully_executed(self) -> "Dataset[T]":
Returns:
A Dataset with all blocks fully materialized in memory.
"""
blocks, metadata = [], []
for b, m in self._plan.execute().get_blocks_with_metadata():
blocks.append(b)
metadata.append(m)
ds = Dataset(
ExecutionPlan(
BlockList(blocks, metadata),
self._plan.stats(),
dataset_uuid=self._get_uuid(),
),
self._epoch,
lazy=False,
)
plan = self._plan.deep_copy(preserve_uuid=True)
plan.execute(force_read=True)
ds = Dataset(plan, self._epoch, lazy=False)
ds._set_uuid(self._get_uuid())
return ds

def is_fully_executed(self) -> bool:
"""Returns whether this Dataset has been fully executed.

This will return False if this Dataset is lazy and if the output of its final
stage hasn't been computed yet.
"""
return self._plan.has_computed_output()

def stats(self) -> str:
"""Returns a string containing execution timing information."""
return self._plan.stats().summary_string()
Expand All @@ -2794,6 +2792,82 @@ def _experimental_lazy(self) -> "Dataset[T]":
self._lazy = True
return self

def is_out_of_band_serializable(self) -> bool:
"""Whether this dataset is able to be out-of-band serialized, i.e. serialized
for us across different Ray clusters.. Only datasets that read from lazy
datasources (i.e. via one of the ray.data.read_*() APIs) are out-of-band
serializable.
"""
return self._plan.has_lazy_input()

@DeveloperAPI
def serialize_out_of_band(self) -> bytes:
"""
Serialize the Dataset for out-of-band use, i.e. for use across different Ray
clusters. This method serializes the lineage of the Dataset operations. Note
that this will drop all computed data, and that everything will be recomputed
from scratch after deserialization.

Use ``Dataset.deserialize_out_of_band`` to deserialize the serialized bytes
into a Dataset.

Returns:
Serialized bytes.
"""
if not self.is_out_of_band_serializable():
raise ValueError(
"Out-of-band serialization is only supported for Datasets created from "
"lazy datasources. Explicitly, out-of-band serialization is not "
"supported for any ray.data.from_*() APIs. To allow this Dataset to be "
"out-of-band serialized, write the data to an external store (such as "
"AWS S3, GCS, or Azure Blob Storage) using the Dataset.write_*() APIs, "
"and serialize a new dataset reading from the external store using the "
"ray.data.read_*() APIs."
)
# Copy Dataset and clear the execution plan so the Dataset is out-of-band
# serializable.
plan_copy = self._plan.deep_copy(preserve_uuid=True)
ds = Dataset(plan_copy, self._get_epoch(), self._lazy)
ds._plan.clear()
ds._set_uuid(self._get_uuid())

def _reduce(rf: ray.remote_function.RemoteFunction):
# Custom reducer for Ray remote function handles that allows for
# cross-cluster serialization.
# TODO(Clark): Fix this in core Ray.
reconstructor, args, state = rf.__reduce__()
# Manually unset last export session and job to force re-exporting of the
# function when the handle is deserialized on a new cluster.
state["_last_export_session_and_job"] = None
return reconstructor, args, state

context = ray.worker.global_worker.get_serialization_context()
try:
context._register_cloudpickle_reducer(
ray.remote_function.RemoteFunction, _reduce
)
serialized = pickle.dumps(ds)
finally:
context._unregister_cloudpickle_reducer(ray.remote_function.RemoteFunction)
return serialized

@DeveloperAPI
@staticmethod
def deserialize_out_of_band(serialized_ds: bytes) -> "Dataset":
"""
Deserialize the provided out-of-band serialized Dataset.

This assumes that the provided serialized bytes were serialized using
``Dataset.serialize_out_of_band``.

Args:
serialized_ds: The serialized Dataset that we wish to deserialize.

Returns:
A deserialized ``Dataset`` instance.
"""
return pickle.loads(serialized_ds)

def _split(
self, index: int, return_right_half: bool
) -> ("Dataset[T]", "Dataset[T]"):
Expand Down
9 changes: 7 additions & 2 deletions python/ray/data/dataset_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -752,16 +752,21 @@ def _optimize_stages(self):
self._optimized_stages = self._stages
return

# This dummy dataset will be used to get a set of optimized stages.
dummy_ds = Dataset(
ExecutionPlan(BlockList([], []), DatasetStats(stages={}, parent=None)),
0,
True,
)
# Apply all pipeline operations to the dummy dataset.
for stage in self._stages:
dummy_ds = stage(dummy_ds)
dummy_ds._plan._optimize()
# Get the optimized stages.
_, _, stages = dummy_ds._plan._optimize()
# Apply these optimized stages to the datasets underlying the pipeline.
# These optimized stages will be executed by the PipelineExecutor.
optimized_stages = []
for stage in dummy_ds._plan._stages:
for stage in stages:
optimized_stages.append(
lambda ds, stage=stage: Dataset(
ds._plan.with_stage(stage), ds._epoch, True
Expand Down
5 changes: 5 additions & 0 deletions python/ray/data/impl/lazy_block_list.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,11 @@ def _get_blocks_with_metadata(
self._cached_metadata = metadata
return block_refs, metadata

def compute_to_blocklist(self) -> BlockList:
"""Launch all tasks and return a concrete BlockList."""
blocks, metadata = self._get_blocks_with_metadata()
return BlockList(blocks, metadata)

def compute_first_block(self):
"""Kick off computation for the first block in the list.

Expand Down
Loading