From 0367a264e5e5f4fb483a788806a4898d987c13a4 Mon Sep 17 00:00:00 2001 From: Cheng Su Date: Thu, 1 Feb 2024 10:13:45 -0800 Subject: [PATCH 1/5] Auto-infer tf.TypeSpec of columns in Signed-off-by: Cheng Su --- python/ray/data/dataset.py | 12 +++++++ python/ray/data/iterator.py | 62 ++++++++++++++++--------------------- 2 files changed, 39 insertions(+), 35 deletions(-) diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index 6eae14e34af8a..6053c8d150664 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -4004,6 +4004,8 @@ def to_tf( drop_last: bool = False, local_shuffle_buffer_size: Optional[int] = None, local_shuffle_seed: Optional[int] = None, + feature_type_spec: Union[tf.TypeSpec, Dict[str, tf.TypeSpec]] = None, + label_type_spec: Union[tf.TypeSpec, Dict[str, tf.TypeSpec]] = None, # Deprecated prefetch_blocks: int = 0, ) -> "tf.data.Dataset": @@ -4088,6 +4090,14 @@ def to_tf( therefore ``batch_size`` must also be specified when using local shuffling. local_shuffle_seed: The seed to use for the local random shuffle. + feature_type_spec: The `tf.TypeSpec` of `feature_columns`. If there is + only one column, specify a `tf.TypeSpec`. If there are multiple columns, + specify a ``dict`` that maps column names to their `tf.TypeSpec`. + Default is `None` to automatically infer the type of each column. + label_type_spec: The `tf.TypeSpec` of `label_columns`. If there is + only one column, specify a `tf.TypeSpec`. If there are multiple columns, + specify a ``dict`` that maps column names to their `tf.TypeSpec`. + Default is `None` to automatically infer the type of each column. Returns: A `TensorFlow Dataset`_ that yields inputs and targets. @@ -4107,6 +4117,8 @@ def to_tf( batch_size=batch_size, local_shuffle_buffer_size=local_shuffle_buffer_size, local_shuffle_seed=local_shuffle_seed, + feature_type_spec=feature_type_spec, + label_type_spec=label_type_spec, ) @ConsumptionAPI(pattern="Time complexity:") diff --git a/python/ray/data/iterator.py b/python/ray/data/iterator.py index 64468e24c7b1e..80dceee367ca3 100644 --- a/python/ray/data/iterator.py +++ b/python/ray/data/iterator.py @@ -676,6 +676,8 @@ def to_tf( drop_last: bool = False, local_shuffle_buffer_size: Optional[int] = None, local_shuffle_seed: Optional[int] = None, + feature_type_spec: Union[tf.TypeSpec, Dict[str, tf.TypeSpec]] = None, + label_type_spec: Union[tf.TypeSpec, Dict[str, tf.TypeSpec]] = None, # Deprecated. prefetch_blocks: int = 0, ) -> "tf.data.Dataset": @@ -761,56 +763,45 @@ def to_tf( therefore ``batch_size`` must also be specified when using local shuffling. local_shuffle_seed: The seed to use for the local random shuffle. + feature_type_spec: The `tf.TypeSpec` of `feature_columns`. If there is + only one column, specify a `tf.TypeSpec`. If there are multiple columns, + specify a ``dict`` that maps column names to their `tf.TypeSpec`. + Default is `None` to automatically infer the type of each column. + label_type_spec: The `tf.TypeSpec` of `label_columns`. If there is + only one column, specify a `tf.TypeSpec`. If there are multiple columns, + specify a ``dict`` that maps column names to their `tf.TypeSpec`. + Default is `None` to automatically infer the type of each column. Returns: A ``tf.data.Dataset`` that yields inputs and targets. """ # noqa: E501 - from ray.air._internal.tensorflow_utils import ( - convert_ndarray_to_tf_tensor, - get_type_spec, - ) + from ray.air._internal.tensorflow_utils import convert_ndarray_to_tf_tensor try: import tensorflow as tf except ImportError: raise ValueError("tensorflow must be installed!") - schema = self.schema() - valid_columns = schema.names - - def validate_column(column: str) -> None: - if column not in valid_columns: - raise ValueError( - f"You specified '{column}' in `feature_columns` or " - f"`label_columns`, but there's no column named '{column}' in the " - f"dataset. Valid column names are: {valid_columns}." - ) - - def validate_columns(columns: Union[str, List]) -> None: - if isinstance(columns, list): - for column in columns: - validate_column(column) - else: - validate_column(columns) - - validate_columns(feature_columns) - validate_columns(label_columns) - def convert_batch_to_tensors( batch: Dict[str, np.ndarray], *, columns: Union[str, List[str]], - type_spec: Union[tf.TypeSpec, Dict[str, tf.TypeSpec]], + type_spec: Union[tf.TypeSpec, Dict[str, tf.TypeSpec]] = None, ) -> Union[tf.Tensor, Dict[str, tf.Tensor]]: if isinstance(columns, str): return convert_ndarray_to_tf_tensor(batch[columns], type_spec=type_spec) - return { - column: convert_ndarray_to_tf_tensor( - batch[column], type_spec=type_spec[column] - ) - for column in columns - } + else: + tensors = {} + for column in columns: + if type_spec is not None: + column_type_spec = type_spec[column] + else: + column_type_spec = None + tensors[column] = convert_ndarray_to_tf_tensor( + batch[column], type_spec=column_type_spec + ) + return tensors def generator(): for batch in self.iter_batches( @@ -830,9 +821,10 @@ def generator(): ) yield features, labels - feature_type_spec = get_type_spec(schema, columns=feature_columns) - label_type_spec = get_type_spec(schema, columns=label_columns) - output_signature = (feature_type_spec, label_type_spec) + if feature_type_spec is not None and label_type_spec is not None: + output_signature = (feature_type_spec, label_type_spec) + else: + output_signature = None dataset = tf.data.Dataset.from_generator( generator, output_signature=output_signature From 1dcf4d8750f2abea440a552b5b6b733a2698698f Mon Sep 17 00:00:00 2001 From: Cheng Su Date: Thu, 1 Feb 2024 10:48:19 -0800 Subject: [PATCH 2/5] Fix type Signed-off-by: Cheng Su --- python/ray/data/dataset.py | 4 ++-- python/ray/data/iterator.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index 6053c8d150664..d36498ec88d41 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -4004,8 +4004,8 @@ def to_tf( drop_last: bool = False, local_shuffle_buffer_size: Optional[int] = None, local_shuffle_seed: Optional[int] = None, - feature_type_spec: Union[tf.TypeSpec, Dict[str, tf.TypeSpec]] = None, - label_type_spec: Union[tf.TypeSpec, Dict[str, tf.TypeSpec]] = None, + feature_type_spec: Union["tf.TypeSpec", Dict[str, "tf.TypeSpec"]] = None, + label_type_spec: Union["tf.TypeSpec", Dict[str, "tf.TypeSpec"]] = None, # Deprecated prefetch_blocks: int = 0, ) -> "tf.data.Dataset": diff --git a/python/ray/data/iterator.py b/python/ray/data/iterator.py index 80dceee367ca3..e7083cf9f9031 100644 --- a/python/ray/data/iterator.py +++ b/python/ray/data/iterator.py @@ -676,8 +676,8 @@ def to_tf( drop_last: bool = False, local_shuffle_buffer_size: Optional[int] = None, local_shuffle_seed: Optional[int] = None, - feature_type_spec: Union[tf.TypeSpec, Dict[str, tf.TypeSpec]] = None, - label_type_spec: Union[tf.TypeSpec, Dict[str, tf.TypeSpec]] = None, + feature_type_spec: Union["tf.TypeSpec", Dict[str, "tf.TypeSpec"]] = None, + label_type_spec: Union["tf.TypeSpec", Dict[str, "tf.TypeSpec"]] = None, # Deprecated. prefetch_blocks: int = 0, ) -> "tf.data.Dataset": From c5db0333ea0f41bc335c0dcaf8bbede6230a6407 Mon Sep 17 00:00:00 2001 From: Cheng Su Date: Fri, 2 Feb 2024 14:25:48 -0800 Subject: [PATCH 3/5] Change to call if type_spec not provided Signed-off-by: Cheng Su --- python/ray/data/iterator.py | 52 ++++++++++++++++++++++++------------- 1 file changed, 34 insertions(+), 18 deletions(-) diff --git a/python/ray/data/iterator.py b/python/ray/data/iterator.py index e7083cf9f9031..4965419261dfb 100644 --- a/python/ray/data/iterator.py +++ b/python/ray/data/iterator.py @@ -776,32 +776,45 @@ def to_tf( A ``tf.data.Dataset`` that yields inputs and targets. """ # noqa: E501 - from ray.air._internal.tensorflow_utils import convert_ndarray_to_tf_tensor + from ray.air._internal.tensorflow_utils import ( + convert_ndarray_to_tf_tensor, + get_type_spec, + ) try: import tensorflow as tf except ImportError: raise ValueError("tensorflow must be installed!") + def validate_column(column: str) -> None: + if column not in valid_columns: + raise ValueError( + f"You specified '{column}' in `feature_columns` or " + f"`label_columns`, but there's no column named '{column}' in the " + f"dataset. Valid column names are: {valid_columns}." + ) + + def validate_columns(columns: Union[str, List]) -> None: + if isinstance(columns, list): + for column in columns: + validate_column(column) + else: + validate_column(columns) + def convert_batch_to_tensors( batch: Dict[str, np.ndarray], *, columns: Union[str, List[str]], - type_spec: Union[tf.TypeSpec, Dict[str, tf.TypeSpec]] = None, + type_spec: Union[tf.TypeSpec, Dict[str, tf.TypeSpec]], ) -> Union[tf.Tensor, Dict[str, tf.Tensor]]: if isinstance(columns, str): return convert_ndarray_to_tf_tensor(batch[columns], type_spec=type_spec) - else: - tensors = {} - for column in columns: - if type_spec is not None: - column_type_spec = type_spec[column] - else: - column_type_spec = None - tensors[column] = convert_ndarray_to_tf_tensor( - batch[column], type_spec=column_type_spec - ) - return tensors + return { + column: convert_ndarray_to_tf_tensor( + batch[column], type_spec=type_spec[column] + ) + for column in columns + } def generator(): for batch in self.iter_batches( @@ -821,13 +834,16 @@ def generator(): ) yield features, labels - if feature_type_spec is not None and label_type_spec is not None: - output_signature = (feature_type_spec, label_type_spec) - else: - output_signature = None + if feature_type_spec is None or label_type_spec is None: + schema = self.schema() + valid_columns = schema.names + validate_columns(feature_columns) + validate_columns(label_columns) + feature_type_spec = get_type_spec(schema, columns=feature_columns) + label_type_spec = get_type_spec(schema, columns=label_columns) dataset = tf.data.Dataset.from_generator( - generator, output_signature=output_signature + generator, output_signature=(feature_type_spec, label_type_spec) ) options = tf.data.Options() From 1992bc1d406c492388b0c5e363a0c7799157128a Mon Sep 17 00:00:00 2001 From: Cheng Su Date: Wed, 7 Feb 2024 11:23:10 -0800 Subject: [PATCH 4/5] Add unit test Signed-off-by: Cheng Su --- python/ray/data/tests/test_tf.py | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/python/ray/data/tests/test_tf.py b/python/ray/data/tests/test_tf.py index 39b7445861190..43819614ce0fd 100644 --- a/python/ray/data/tests/test_tf.py +++ b/python/ray/data/tests/test_tf.py @@ -31,6 +31,26 @@ def test_element_spec_type(self): assert isinstance(feature_spec, tf.TypeSpec) assert isinstance(label_spec, tf.TypeSpec) + def test_element_spec_user_provided(self): + ds = ray.data.from_items([{"spam": 0, "ham": 0, "eggs": 0}]) + + dataset1 = ds.to_tf(feature_columns=["spam", "ham"], label_columns="eggs") + feature_spec, label_spec = dataset1.element_spec + dataset2 = ds.to_tf( + feature_columns=["spam", "ham"], + label_columns="eggs", + feature_type_spec=feature_spec, + label_spec=label_spec, + ) + feature_output_spec, label_output_spec = dataset2.element_spec + assert isinstance(label_output_spec, tf.TypeSpec) + assert isinstance(feature_output_spec, dict) + assert feature_output_spec.keys() == {"spam", "ham"} + assert all( + isinstance(value, tf.TypeSpec) + for value in feature_output_spec.values() + ) + def test_element_spec_type_with_multiple_columns(self): ds = ray.data.from_items([{"spam": 0, "ham": 0, "eggs": 0}]) From cad3d4ff5bcf511314ae2c8ca345bac203b92726 Mon Sep 17 00:00:00 2001 From: Cheng Su Date: Wed, 7 Feb 2024 12:15:35 -0800 Subject: [PATCH 5/5] Fix test Signed-off-by: Cheng Su --- python/ray/data/tests/test_tf.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/python/ray/data/tests/test_tf.py b/python/ray/data/tests/test_tf.py index 43819614ce0fd..e66f4f4447638 100644 --- a/python/ray/data/tests/test_tf.py +++ b/python/ray/data/tests/test_tf.py @@ -40,15 +40,14 @@ def test_element_spec_user_provided(self): feature_columns=["spam", "ham"], label_columns="eggs", feature_type_spec=feature_spec, - label_spec=label_spec, + label_type_spec=label_spec, ) feature_output_spec, label_output_spec = dataset2.element_spec assert isinstance(label_output_spec, tf.TypeSpec) assert isinstance(feature_output_spec, dict) assert feature_output_spec.keys() == {"spam", "ham"} assert all( - isinstance(value, tf.TypeSpec) - for value in feature_output_spec.values() + isinstance(value, tf.TypeSpec) for value in feature_output_spec.values() ) def test_element_spec_type_with_multiple_columns(self):