-
Notifications
You must be signed in to change notification settings - Fork 5.4k
/
iterator.py
861 lines (751 loc) · 37.4 KB
/
iterator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
import abc
import time
from typing import (
TYPE_CHECKING,
Any,
Callable,
Dict,
Iterable,
Iterator,
List,
Optional,
Tuple,
TypeVar,
Union,
)
import numpy as np
from ray.data._internal.block_batching.iter_batches import iter_batches
from ray.data._internal.stats import DatasetStats, StatsManager
from ray.data.block import (
Block,
BlockAccessor,
BlockMetadata,
DataBatch,
_apply_strict_mode_batch_format,
)
from ray.types import ObjectRef
from ray.util.annotations import PublicAPI
if TYPE_CHECKING:
import tensorflow as tf
import torch
from ray.data.dataset import (
CollatedData,
Schema,
TensorFlowTensorBatchType,
TorchBatchType,
)
T = TypeVar("T")
class _IterableFromIterator(Iterable[T]):
def __init__(self, iterator_gen: Callable[[], Iterator[T]]):
"""Constructs an Iterable from an iterator generator.
Args:
iterator_gen: A function that returns an iterator each time it
is called. For example, this can be a generator function.
"""
self.iterator_gen = iterator_gen
def __iter__(self):
return self.iterator_gen()
@PublicAPI(stability="beta")
class DataIterator(abc.ABC):
"""An iterator for reading records from a :class:`~Dataset`.
For Datasets, each iteration call represents a complete read of all items in the
Dataset.
If using Ray Train, each trainer actor should get its own iterator by calling
:meth:`ray.train.get_dataset_shard("train")
<ray.train.get_dataset_shard>`.
Examples:
>>> import ray
>>> ds = ray.data.range(5)
>>> ds
Dataset(num_blocks=..., num_rows=5, schema={id: int64})
>>> ds.iterator()
DataIterator(Dataset(num_blocks=..., num_rows=5, schema={id: int64}))
"""
@abc.abstractmethod
def _to_block_iterator(
self,
) -> Tuple[
Iterator[Tuple[ObjectRef[Block], BlockMetadata]],
Optional[DatasetStats],
bool,
]:
"""Returns the iterator to use for `iter_batches`.
Returns:
A tuple. The first item of the tuple is an iterator over pairs of Block
object references and their corresponding metadata. The second item of the
tuple is a DatasetStats object used for recording stats during iteration.
The third item is a boolean indicating if the blocks can be safely cleared
after use.
"""
raise NotImplementedError
def iter_batches(
self,
*,
prefetch_batches: int = 1,
batch_size: int = 256,
batch_format: Optional[str] = "default",
drop_last: bool = False,
local_shuffle_buffer_size: Optional[int] = None,
local_shuffle_seed: Optional[int] = None,
_collate_fn: Optional[Callable[[DataBatch], "CollatedData"]] = None,
_finalize_fn: Optional[Callable[[Any], Any]] = None,
# Deprecated.
prefetch_blocks: int = 0,
) -> Iterable[DataBatch]:
"""Return a batched iterable over the dataset.
Examples:
>>> import ray
>>> for batch in ray.data.range(
... 1000000
... ).iterator().iter_batches(): # doctest: +SKIP
... print(batch) # doctest: +SKIP
Time complexity: O(1)
Args:
prefetch_batches: The number of batches to fetch ahead of the current batch
to fetch. If set to greater than 0, a separate threadpool will be used
to fetch the objects to the local node, format the batches, and apply
the collate_fn. Defaults to 1.
batch_size: The number of rows in each batch, or None to use entire blocks
as batches (blocks may contain different number of rows).
The final batch may include fewer than ``batch_size`` rows if
``drop_last`` is ``False``. Defaults to 256.
batch_format: Specify ``"default"`` to use the default block format
(NumPy), ``"pandas"`` to select ``pandas.DataFrame``, "pyarrow" to
select ``pyarrow.Table``, or ``"numpy"`` to select
``Dict[str, numpy.ndarray]``, or None to return the underlying block
exactly as is with no additional formatting.
drop_last: Whether to drop the last batch if it's incomplete.
local_shuffle_buffer_size: If non-None, the data will be randomly shuffled
using a local in-memory shuffle buffer, and this value will serve as the
minimum number of rows that must be in the local in-memory shuffle
buffer in order to yield a batch. When there are no more rows to add to
the buffer, the remaining rows in the buffer will be drained.
local_shuffle_seed: The seed to use for the local random shuffle.
Returns:
An iterable over record batches.
"""
if prefetch_blocks > 0:
raise DeprecationWarning(
"`prefetch_blocks` arg is deprecated in Ray 2.4. Use "
"the `prefetch_batches` arg instead to specify the amount of "
"prefetching in terms of batches instead of blocks."
)
batch_format = _apply_strict_mode_batch_format(batch_format)
def _create_iterator() -> Iterator[DataBatch]:
time_start = time.perf_counter()
# Iterate through the dataset from the start each time
# _iterator_gen is called.
# This allows multiple iterations of the dataset without
# needing to explicitly call `iter_batches()` multiple times.
block_iterator, stats, blocks_owned_by_consumer = self._to_block_iterator()
iterator = iter(
iter_batches(
block_iterator,
stats=stats,
clear_block_after_read=blocks_owned_by_consumer,
batch_size=batch_size,
batch_format=batch_format,
drop_last=drop_last,
collate_fn=_collate_fn,
finalize_fn=_finalize_fn,
shuffle_buffer_min_size=local_shuffle_buffer_size,
shuffle_seed=local_shuffle_seed,
prefetch_batches=prefetch_batches,
)
)
dataset_tag = self._get_dataset_tag()
if stats:
stats.iter_initialize_s.add(time.perf_counter() - time_start)
for batch in iterator:
yield batch
StatsManager.update_iteration_metrics(stats, dataset_tag)
StatsManager.clear_iteration_metrics(dataset_tag)
if stats:
stats.iter_total_s.add(time.perf_counter() - time_start)
return _IterableFromIterator(_create_iterator)
def _get_dataset_tag(self) -> str:
return "unknown_dataset"
def iter_rows(self, *, prefetch_blocks: int = 0) -> Iterable[Dict[str, Any]]:
"""Return a local row iterable over the dataset.
If the dataset is a tabular dataset (Arrow/Pandas blocks), dicts
are yielded for each row by the iterator. If the dataset is not tabular,
the raw row is yielded.
Examples:
>>> import ray
>>> dataset = ray.data.range(10)
>>> next(iter(dataset.iterator().iter_rows()))
{'id': 0}
Time complexity: O(1)
Args:
prefetch_blocks: The number of blocks to prefetch ahead of the
current block during the scan.
Returns:
An iterable over rows of the dataset.
"""
iter_batch_args = {"batch_size": None, "batch_format": None}
iter_batch_args["prefetch_batches"] = prefetch_blocks
batch_iterable = self.iter_batches(**iter_batch_args)
def _wrapped_iterator():
for batch in batch_iterable:
batch = BlockAccessor.for_block(BlockAccessor.batch_to_block(batch))
for row in batch.iter_rows(public_row_format=True):
yield row
return _IterableFromIterator(_wrapped_iterator)
@abc.abstractmethod
def stats(self) -> str:
"""Returns a string containing execution timing information."""
raise NotImplementedError
@abc.abstractmethod
def schema(self) -> "Schema":
"""Return the schema of the dataset iterated over."""
raise NotImplementedError
def iter_torch_batches(
self,
*,
prefetch_batches: int = 1,
batch_size: Optional[int] = 256,
dtypes: Optional[Union["torch.dtype", Dict[str, "torch.dtype"]]] = None,
device: str = "auto",
collate_fn: Optional[Callable[[Dict[str, np.ndarray]], "CollatedData"]] = None,
drop_last: bool = False,
local_shuffle_buffer_size: Optional[int] = None,
local_shuffle_seed: Optional[int] = None,
# Deprecated.
prefetch_blocks: int = 0,
) -> Iterable["TorchBatchType"]:
"""Return a batched iterable of Torch Tensors over the dataset.
This iterable yields a dictionary of column-tensors. If you are looking for
more flexibility in the tensor conversion (e.g. casting dtypes) or the batch
format, try using :meth:`~ray.data.iterator.DataIterator.iter_batches` directly.
Examples:
>>> import ray
>>> for batch in ray.data.range(
... 12,
... ).iterator().iter_torch_batches(batch_size=4):
... print(batch)
{'id': tensor([0, 1, 2, 3])}
{'id': tensor([4, 5, 6, 7])}
{'id': tensor([ 8, 9, 10, 11])}
Use the ``collate_fn`` to customize how the tensor batch is created.
>>> from typing import Any, Dict
>>> import torch
>>> import numpy as np
>>> import ray
>>> def collate_fn(batch: Dict[str, np.ndarray]) -> Any:
... return torch.stack(
... [torch.as_tensor(array) for array in batch.values()],
... axis=1
... )
>>> iterator = ray.data.from_items([
... {"col_1": 1, "col_2": 2},
... {"col_1": 3, "col_2": 4}]).iterator()
>>> for batch in iterator.iter_torch_batches(collate_fn=collate_fn):
... print(batch)
tensor([[1, 2],
[3, 4]])
Time complexity: O(1)
Args:
prefetch_batches: The number of batches to fetch ahead of the current batch
to fetch. If set to greater than 0, a separate threadpool will be used
to fetch the objects to the local node, format the batches, and apply
the collate_fn. Defaults to 1.
batch_size: The number of rows in each batch, or None to use entire blocks
as batches (blocks may contain different number of rows).
The final batch may include fewer than ``batch_size`` rows if
``drop_last`` is ``False``. Defaults to 256.
dtypes: The Torch dtype(s) for the created tensor(s); if None, the dtype
will be inferred from the tensor data. You can't use this parameter
with ``collate_fn``.
device: The device on which the tensor should be placed. Defaults to
"auto" which moves the tensors to the appropriate device when the
Dataset is passed to Ray Train and ``collate_fn`` is not provided.
Otherwise, defaults to CPU. You can't use this parameter with
``collate_fn``.
collate_fn: A function to convert a Numpy batch to a PyTorch tensor batch.
When this parameter is specified, the user should manually handle the
host to device data transfer outside of ``collate_fn``.
This is useful for further processing the data after it has been
batched. Potential use cases include collating along a dimension other
than the first, padding sequences of various lengths, or generally
handling batches of different length tensors. If not provided, the
default collate function is used which simply converts the batch of
numpy arrays to a batch of PyTorch tensors. This API is still
experimental and is subject to change. You can't use this parameter in
conjunction with ``dtypes`` or ``device``.
drop_last: Whether to drop the last batch if it's incomplete.
local_shuffle_buffer_size: If non-None, the data will be randomly shuffled
using a local in-memory shuffle buffer, and this value will serve as the
minimum number of rows that must be in the local in-memory shuffle
buffer in order to yield a batch. When there are no more rows to add to
the buffer, the remaining rows in the buffer will be drained. This
buffer size must be greater than or equal to ``batch_size``, and
therefore ``batch_size`` must also be specified when using local
shuffling.
local_shuffle_seed: The seed to use for the local random shuffle.
Returns:
An iterable over Torch Tensor batches.
"""
from ray.air._internal.torch_utils import (
convert_ndarray_batch_to_torch_tensor_batch,
)
from ray.train.torch import get_device
if collate_fn is not None and (dtypes is not None or device != "auto"):
raise ValueError(
"collate_fn cannot be used with dtypes and device."
"You should manually move the output Torch tensors to the"
"desired dtype and device outside of collate_fn."
)
if device == "auto":
# Use the appropriate device for Ray Train, or falls back to CPU if
# Ray Train is not being used.
device = get_device()
if collate_fn is None:
# The default collate_fn handles formatting and Tensor creation.
# Here, we set device=None to defer host to device data transfer
# to the subsequent finalize_fn.
def collate_fn(batch: Union[np.ndarray, Dict[str, np.ndarray]]):
return convert_ndarray_batch_to_torch_tensor_batch(
batch,
dtypes=dtypes,
device=None,
)
# The default finalize_fn handles the host to device data transfer.
# This is executed in a 1-thread pool separately from collate_fn
# to allow independent parallelism of these steps.
def finalize_fn(batch: Union["torch.Tensor", Dict[str, "torch.Tensor"]]):
if device is not None:
if isinstance(batch, dict):
for k, t in batch.items():
batch[k] = t.to(device=device)
else:
batch = batch.to(device=device)
return batch
else:
finalize_fn = None
return self.iter_batches(
prefetch_batches=prefetch_batches,
prefetch_blocks=prefetch_blocks,
batch_size=batch_size,
drop_last=drop_last,
local_shuffle_buffer_size=local_shuffle_buffer_size,
local_shuffle_seed=local_shuffle_seed,
_collate_fn=collate_fn,
_finalize_fn=finalize_fn,
)
def iter_tf_batches(
self,
*,
prefetch_batches: int = 1,
batch_size: Optional[int] = 256,
dtypes: Optional[Union["tf.dtypes.DType", Dict[str, "tf.dtypes.DType"]]] = None,
drop_last: bool = False,
local_shuffle_buffer_size: Optional[int] = None,
local_shuffle_seed: Optional[int] = None,
# Deprecated.
prefetch_blocks: int = 0,
) -> Iterable["TensorFlowTensorBatchType"]:
"""Return a batched iterable of TensorFlow Tensors over the dataset.
This iterable will yield single-tensor batches of the underlying dataset
consists of a single column; otherwise, it will yield a dictionary of
column-tensors.
.. tip::
If you don't need the additional flexibility provided by this method,
consider using :meth:`~ray.data.Dataset.to_tf` instead. It's easier
to use.
Examples:
>>> import ray
>>> for batch in ray.data.range( # doctest: +SKIP
... 12,
... ).iter_tf_batches(batch_size=4):
... print(batch.shape) # doctest: +SKIP
(4, 1)
(4, 1)
(4, 1)
Time complexity: O(1)
Args:
prefetch_batches: The number of batches to fetch ahead of the current batch
to fetch. If set to greater than 0, a separate threadpool will be used
to fetch the objects to the local node, format the batches, and apply
the collate_fn. Defaults to 1.
batch_size: The number of rows in each batch, or None to use entire blocks
as batches (blocks may contain different number of rows).
The final batch may include fewer than ``batch_size`` rows if
``drop_last`` is ``False``. Defaults to 256.
dtypes: The TensorFlow dtype(s) for the created tensor(s); if None, the
dtype will be inferred from the tensor data.
drop_last: Whether to drop the last batch if it's incomplete.
local_shuffle_buffer_size: If non-None, the data will be randomly shuffled
using a local in-memory shuffle buffer, and this value will serve as the
minimum number of rows that must be in the local in-memory shuffle
buffer in order to yield a batch. When there are no more rows to add to
the buffer, the remaining rows in the buffer will be drained. This
buffer size must be greater than or equal to ``batch_size``, and
therefore ``batch_size`` must also be specified when using local
shuffling.
local_shuffle_seed: The seed to use for the local random shuffle.
Returns:
An iterator over TensorFlow Tensor batches.
"""
from ray.air._internal.tensorflow_utils import (
convert_ndarray_batch_to_tf_tensor_batch,
)
batch_iterable = self.iter_batches(
prefetch_batches=prefetch_batches,
prefetch_blocks=prefetch_blocks,
batch_size=batch_size,
drop_last=drop_last,
local_shuffle_buffer_size=local_shuffle_buffer_size,
local_shuffle_seed=local_shuffle_seed,
)
mapped_iterable = map(
lambda batch: convert_ndarray_batch_to_tf_tensor_batch(
batch, dtypes=dtypes
),
batch_iterable,
)
return mapped_iterable
def to_torch(
self,
*,
label_column: Optional[str] = None,
feature_columns: Optional[
Union[List[str], List[List[str]], Dict[str, List[str]]]
] = None,
label_column_dtype: Optional["torch.dtype"] = None,
feature_column_dtypes: Optional[
Union["torch.dtype", List["torch.dtype"], Dict[str, "torch.dtype"]]
] = None,
batch_size: int = 1,
prefetch_batches: int = 1,
drop_last: bool = False,
local_shuffle_buffer_size: Optional[int] = None,
local_shuffle_seed: Optional[int] = None,
unsqueeze_label_tensor: bool = True,
unsqueeze_feature_tensors: bool = True,
# Deprecated.
prefetch_blocks: int = 0,
) -> "torch.utils.data.IterableDataset":
"""Return a Torch IterableDataset over this dataset.
This is only supported for datasets convertible to Arrow records.
It is recommended to use the returned ``IterableDataset`` directly
instead of passing it into a torch ``DataLoader``.
Each element in IterableDataset will be a tuple consisting of 2
elements. The first item contains the feature tensor(s), and the
second item is the label tensor. Those can take on different
forms, depending on the specified arguments.
For the features tensor (N is the ``batch_size`` and n, m, k
are the number of features per tensor):
* If ``feature_columns`` is a ``List[str]``, the features will be
a tensor of shape (N, n), with columns corresponding to
``feature_columns``
* If ``feature_columns`` is a ``List[List[str]]``, the features will be
a list of tensors of shape [(N, m),...,(N, k)], with columns of each
tensor corresponding to the elements of ``feature_columns``
* If ``feature_columns`` is a ``Dict[str, List[str]]``, the features
will be a dict of key-tensor pairs of shape
{key1: (N, m),..., keyN: (N, k)}, with columns of each
tensor corresponding to the value of ``feature_columns`` under the
key.
If ``unsqueeze_label_tensor=True`` (default), the label tensor will be
of shape (N, 1). Otherwise, it will be of shape (N,).
If ``label_column`` is specified as ``None``, then no column from the
``Dataset`` will be treated as the label, and the output label tensor
will be ``None``.
Note that you probably want to call ``.split()`` on this dataset if
there are to be multiple Torch workers consuming the data.
Time complexity: O(1)
Args:
label_column: The name of the column used as the
label (second element of the output list). Can be None for
prediction, in which case the second element of returned
tuple will also be None.
feature_columns: The names of the columns
to use as the features. Can be a list of lists or
a dict of string-list pairs for multi-tensor output.
If None, then use all columns except the label column as
the features.
label_column_dtype: The torch dtype to
use for the label column. If None, then automatically infer
the dtype.
feature_column_dtypes: The dtypes to use for the feature
tensors. This should match the format of ``feature_columns``,
or be a single dtype, in which case it will be applied to
all tensors. If None, then automatically infer the dtype.
batch_size: How many samples per batch to yield at a time.
Defaults to 1.
prefetch_batches: The number of batches to fetch ahead of the current batch
to fetch. If set to greater than 0, a separate threadpool will be used
to fetch the objects to the local node, format the batches, and apply
the collate_fn. Defaults to 1.
drop_last: Set to True to drop the last incomplete batch,
if the dataset size is not divisible by the batch size. If
False and the size of dataset is not divisible by the batch
size, then the last batch will be smaller. Defaults to False.
local_shuffle_buffer_size: If non-None, the data will be randomly shuffled
using a local in-memory shuffle buffer, and this value will serve as the
minimum number of rows that must be in the local in-memory shuffle
buffer in order to yield a batch. When there are no more rows to add to
the buffer, the remaining rows in the buffer will be drained. This
buffer size must be greater than or equal to ``batch_size``, and
therefore ``batch_size`` must also be specified when using local
shuffling.
local_shuffle_seed: The seed to use for the local random shuffle.
unsqueeze_label_tensor: If set to True, the label tensor
will be unsqueezed (reshaped to (N, 1)). Otherwise, it will
be left as is, that is (N, ). In general, regression loss
functions expect an unsqueezed tensor, while classification
loss functions expect a squeezed one. Defaults to True.
unsqueeze_feature_tensors: If set to True, the features tensors
will be unsqueezed (reshaped to (N, 1)) before being concatenated into
the final features tensor. Otherwise, they will be left as is, that is
(N, ). Defaults to True.
Returns:
A torch IterableDataset.
"""
import torch
from ray.air._internal.torch_utils import convert_pandas_to_torch_tensor
from ray.data._internal.torch_iterable_dataset import TorchIterableDataset
# If an empty collection is passed in, treat it the same as None
if not feature_columns:
feature_columns = None
if feature_column_dtypes and not isinstance(feature_column_dtypes, torch.dtype):
if isinstance(feature_columns, dict):
if not isinstance(feature_column_dtypes, dict):
raise TypeError(
"If `feature_columns` is a dict, "
"`feature_column_dtypes` must be None, `torch.dtype`,"
f" or dict, got {type(feature_column_dtypes)}."
)
if set(feature_columns) != set(feature_column_dtypes):
raise ValueError(
"`feature_columns` and `feature_column_dtypes` "
"must have the same keys."
)
if any(not subcolumns for subcolumns in feature_columns.values()):
raise ValueError("column list may not be empty")
elif isinstance(feature_columns[0], (list, tuple)):
if not isinstance(feature_column_dtypes, (list, tuple)):
raise TypeError(
"If `feature_columns` is a list of lists, "
"`feature_column_dtypes` must be None, `torch.dtype`,"
f" or a sequence, got {type(feature_column_dtypes)}."
)
if len(feature_columns) != len(feature_column_dtypes):
raise ValueError(
"`feature_columns` and `feature_column_dtypes` "
"must have the same length."
)
if any(not subcolumns for subcolumns in feature_columns):
raise ValueError("column list may not be empty")
def make_generator():
for batch in self.iter_batches(
batch_size=batch_size,
batch_format="pandas",
prefetch_blocks=prefetch_blocks,
prefetch_batches=prefetch_batches,
drop_last=drop_last,
local_shuffle_buffer_size=local_shuffle_buffer_size,
local_shuffle_seed=local_shuffle_seed,
):
if label_column:
label_tensor = convert_pandas_to_torch_tensor(
batch,
[label_column],
label_column_dtype,
unsqueeze=unsqueeze_label_tensor,
)
batch.pop(label_column)
else:
label_tensor = None
if isinstance(feature_columns, dict):
features_tensor = {
key: convert_pandas_to_torch_tensor(
batch,
feature_columns[key],
feature_column_dtypes[key]
if isinstance(feature_column_dtypes, dict)
else feature_column_dtypes,
unsqueeze=unsqueeze_feature_tensors,
)
for key in feature_columns
}
else:
features_tensor = convert_pandas_to_torch_tensor(
batch,
columns=feature_columns,
column_dtypes=feature_column_dtypes,
unsqueeze=unsqueeze_feature_tensors,
)
yield (features_tensor, label_tensor)
return TorchIterableDataset(make_generator)
def to_tf(
self,
feature_columns: Union[str, List[str]],
label_columns: Union[str, List[str]],
*,
prefetch_batches: int = 1,
batch_size: int = 1,
drop_last: bool = False,
local_shuffle_buffer_size: Optional[int] = None,
local_shuffle_seed: Optional[int] = None,
feature_type_spec: Union["tf.TypeSpec", Dict[str, "tf.TypeSpec"]] = None,
label_type_spec: Union["tf.TypeSpec", Dict[str, "tf.TypeSpec"]] = None,
# Deprecated.
prefetch_blocks: int = 0,
) -> "tf.data.Dataset":
"""Return a TF Dataset over this dataset.
.. warning::
If your dataset contains ragged tensors, this method errors. To prevent
errors, :ref:`resize your tensors <transforming_tensors>`.
Examples:
>>> import ray
>>> ds = ray.data.read_csv(
... "s3://anonymous@air-example-data/iris.csv"
... )
>>> it = ds.iterator(); it
DataIterator(Dataset(
num_blocks=...,
num_rows=150,
schema={
sepal length (cm): double,
sepal width (cm): double,
petal length (cm): double,
petal width (cm): double,
target: int64
}
))
If your model accepts a single tensor as input, specify a single feature column.
>>> it.to_tf(feature_columns="sepal length (cm)", label_columns="target") # doctest: +SKIP
<_OptionsDataset element_spec=(TensorSpec(shape=(None,), dtype=tf.float64, name='sepal length (cm)'), TensorSpec(shape=(None,), dtype=tf.int64, name='target'))>
If your model accepts a dictionary as input, specify a list of feature columns.
>>> it.to_tf(["sepal length (cm)", "sepal width (cm)"], "target") # doctest: +SKIP
<_OptionsDataset element_spec=({'sepal length (cm)': TensorSpec(shape=(None,), dtype=tf.float64, name='sepal length (cm)'), 'sepal width (cm)': TensorSpec(shape=(None,), dtype=tf.float64, name='sepal width (cm)')}, TensorSpec(shape=(None,), dtype=tf.int64, name='target'))>
If your dataset contains multiple features but your model accepts a single
tensor as input, combine features with
:class:`~ray.data.preprocessors.Concatenator`.
>>> from ray.data.preprocessors import Concatenator
>>> preprocessor = Concatenator(output_column_name="features", exclude="target")
>>> it = preprocessor.transform(ds).iterator()
>>> it
DataIterator(Concatenator
+- Dataset(
num_blocks=...,
num_rows=150,
schema={
sepal length (cm): double,
sepal width (cm): double,
petal length (cm): double,
petal width (cm): double,
target: int64
}
))
>>> it.to_tf("features", "target") # doctest: +SKIP
<_OptionsDataset element_spec=(TensorSpec(shape=(None, 4), dtype=tf.float64, name='features'), TensorSpec(shape=(None,), dtype=tf.int64, name='target'))>
Args:
feature_columns: Columns that correspond to model inputs. If this is a
string, the input data is a tensor. If this is a list, the input data
is a ``dict`` that maps column names to their tensor representation.
label_column: Columns that correspond to model targets. If this is a
string, the target data is a tensor. If this is a list, the target data
is a ``dict`` that maps column names to their tensor representation.
prefetch_batches: The number of batches to fetch ahead of the current batch
to fetch. If set to greater than 0, a separate threadpool will be used
to fetch the objects to the local node, format the batches, and apply
the collate_fn. Defaults to 1.
batch_size: Record batch size. Defaults to 1.
drop_last: Set to True to drop the last incomplete batch,
if the dataset size is not divisible by the batch size. If
False and the size of dataset is not divisible by the batch
size, then the last batch will be smaller. Defaults to False.
local_shuffle_buffer_size: If non-None, the data will be randomly shuffled
using a local in-memory shuffle buffer, and this value will serve as the
minimum number of rows that must be in the local in-memory shuffle
buffer in order to yield a batch. When there are no more rows to add to
the buffer, the remaining rows in the buffer will be drained. This
buffer size must be greater than or equal to ``batch_size``, and
therefore ``batch_size`` must also be specified when using local
shuffling.
local_shuffle_seed: The seed to use for the local random shuffle.
feature_type_spec: The `tf.TypeSpec` of `feature_columns`. If there is
only one column, specify a `tf.TypeSpec`. If there are multiple columns,
specify a ``dict`` that maps column names to their `tf.TypeSpec`.
Default is `None` to automatically infer the type of each column.
label_type_spec: The `tf.TypeSpec` of `label_columns`. If there is
only one column, specify a `tf.TypeSpec`. If there are multiple columns,
specify a ``dict`` that maps column names to their `tf.TypeSpec`.
Default is `None` to automatically infer the type of each column.
Returns:
A ``tf.data.Dataset`` that yields inputs and targets.
""" # noqa: E501
from ray.air._internal.tensorflow_utils import (
convert_ndarray_to_tf_tensor,
get_type_spec,
)
try:
import tensorflow as tf
except ImportError:
raise ValueError("tensorflow must be installed!")
def validate_column(column: str) -> None:
if column not in valid_columns:
raise ValueError(
f"You specified '{column}' in `feature_columns` or "
f"`label_columns`, but there's no column named '{column}' in the "
f"dataset. Valid column names are: {valid_columns}."
)
def validate_columns(columns: Union[str, List]) -> None:
if isinstance(columns, list):
for column in columns:
validate_column(column)
else:
validate_column(columns)
def convert_batch_to_tensors(
batch: Dict[str, np.ndarray],
*,
columns: Union[str, List[str]],
type_spec: Union[tf.TypeSpec, Dict[str, tf.TypeSpec]],
) -> Union[tf.Tensor, Dict[str, tf.Tensor]]:
if isinstance(columns, str):
return convert_ndarray_to_tf_tensor(batch[columns], type_spec=type_spec)
return {
column: convert_ndarray_to_tf_tensor(
batch[column], type_spec=type_spec[column]
)
for column in columns
}
def generator():
for batch in self.iter_batches(
prefetch_batches=prefetch_batches,
prefetch_blocks=prefetch_blocks,
batch_size=batch_size,
drop_last=drop_last,
local_shuffle_buffer_size=local_shuffle_buffer_size,
local_shuffle_seed=local_shuffle_seed,
):
assert isinstance(batch, dict)
features = convert_batch_to_tensors(
batch, columns=feature_columns, type_spec=feature_type_spec
)
labels = convert_batch_to_tensors(
batch, columns=label_columns, type_spec=label_type_spec
)
yield features, labels
if feature_type_spec is None or label_type_spec is None:
schema = self.schema()
valid_columns = schema.names
validate_columns(feature_columns)
validate_columns(label_columns)
feature_type_spec = get_type_spec(schema, columns=feature_columns)
label_type_spec = get_type_spec(schema, columns=label_columns)
dataset = tf.data.Dataset.from_generator(
generator, output_signature=(feature_type_spec, label_type_spec)
)
options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = (
tf.data.experimental.AutoShardPolicy.OFF
)
return dataset.with_options(options)
def __del__(self):
# Clear metrics on deletion in case the iterator was not fully consumed.
StatsManager.clear_iteration_metrics(self._get_dataset_tag())
# Backwards compatibility alias.
DatasetIterator = DataIterator