From 496009ec369f8d78311afde2b62123596122f076 Mon Sep 17 00:00:00 2001 From: Travis Addair Date: Wed, 25 Nov 2020 17:46:30 -0800 Subject: [PATCH] Added Backend interface for abstracting DataFrame preprocessing steps (#1014) Co-authored-by: w4nderlust --- .gitignore | 4 +- ludwig/api.py | 327 ++++++++-------- ludwig/backend/__init__.py | 35 ++ ludwig/backend/base.py | 88 +++++ ludwig/data/concatenate_datasets.py | 57 +-- ludwig/data/dataframe/__init__.py | 16 + ludwig/data/dataframe/base.py | 67 ++++ ludwig/data/dataframe/pandas.py | 69 ++++ ludwig/data/dataset.py | 39 +- ludwig/data/preprocessing.py | 362 +++++++++++------- ludwig/features/audio_feature.py | 178 +++++---- ludwig/features/bag_feature.py | 41 +- ludwig/features/base_feature.py | 6 +- ludwig/features/binary_feature.py | 20 +- ludwig/features/category_feature.py | 42 +- ludwig/features/date_feature.py | 27 +- ludwig/features/h3_feature.py | 22 +- ludwig/features/image_feature.py | 55 +-- ludwig/features/numerical_feature.py | 31 +- ludwig/features/sequence_feature.py | 31 +- ludwig/features/set_feature.py | 51 ++- ludwig/features/text_feature.py | 45 ++- ludwig/features/timeseries_feature.py | 65 ++-- ludwig/features/vector_feature.py | 24 +- ludwig/models/predictor.py | 12 +- ludwig/models/trainer.py | 11 +- ludwig/utils/audio_utils.py | 8 + ludwig/utils/batcher.py | 2 +- ludwig/utils/data_utils.py | 148 +++---- ludwig/utils/misc_utils.py | 11 +- ludwig/utils/strings_utils.py | 61 ++- requirements.txt | 1 - tests/integration_tests/test_api.py | 58 +-- tests/integration_tests/test_experiment.py | 110 +----- tests/integration_tests/test_kfold_cv.py | 3 +- .../test_model_save_and_load.py | 4 +- .../test_sequence_features.py | 7 +- .../test_visualization_api.py | 5 +- tests/integration_tests/utils.py | 164 +++++++- tests/ludwig/utils/test_normalization.py | 25 +- 40 files changed, 1438 insertions(+), 894 deletions(-) create mode 100644 ludwig/backend/__init__.py create mode 100644 ludwig/backend/base.py create mode 100644 ludwig/data/dataframe/__init__.py create mode 100644 ludwig/data/dataframe/base.py create mode 100644 ludwig/data/dataframe/pandas.py diff --git a/.gitignore b/.gitignore index e530aeeb59c..cc4cd04a26d 100644 --- a/.gitignore +++ b/.gitignore @@ -35,7 +35,7 @@ build/ develop-eggs/ dist/ downloads/ -dataset/ +./dataset/ eggs/ .eggs/ lib/ @@ -140,4 +140,4 @@ tags # examples examples/*/data/ -examples/*/visualizations/ \ No newline at end of file +examples/*/visualizations/ diff --git a/ludwig/api.py b/ludwig/api.py index 3f455e76fd5..d875d690b88 100644 --- a/ludwig/api.py +++ b/ludwig/api.py @@ -34,6 +34,7 @@ import numpy as np import pandas as pd import yaml +from ludwig.backend import LOCAL_BACKEND, Backend, create_backend from ludwig.constants import FULL, PREPROCESSING, TEST, TRAINING, VALIDATION from ludwig.contrib import contrib_command from ludwig.data.dataset import Dataset @@ -145,6 +146,7 @@ def __init__( self, config: Union[str, dict], logging_level: int = logging.ERROR, + backend: Union[Backend, str] = LOCAL_BACKEND, use_horovod: bool = None, gpus: Union[str, int, List[int]] = None, gpu_memory_limit: int = None, @@ -158,6 +160,8 @@ def __init__( :param config: (Union[str, dict]) in-memory representation of config or string path to a YAML config file. :param logging_level: (int) Log level that will be sent to stderr. + :param backend: (Union[Backend, str]) `Backend` or string name + of backend to use to execute preprocessing / training steps. :param use_horovod: (bool) use Horovod for distributed training. Will be set automatically if `horovodrun` is used to launch the training script. @@ -185,6 +189,10 @@ def __init__( # merge config with defaults self.config = merge_with_defaults(config_dict) + self.backend = backend + if isinstance(backend, str): + self.backend = create_backend(backend) + # setup horovod self._horovod = configure_horovod(use_horovod) @@ -205,9 +213,9 @@ def __init__( def train( self, dataset: Union[str, dict, pd.DataFrame] = None, - training_set: Union[str, dict, pd.DataFrame] = None, - validation_set: Union[str, dict, pd.DataFrame] = None, - test_set: Union[str, dict, pd.DataFrame] = None, + training_set: Union[str, dict, pd.DataFrame, Dataset] = None, + validation_set: Union[str, dict, pd.DataFrame, Dataset] = None, + test_set: Union[str, dict, pd.DataFrame, Dataset] = None, training_set_metadata: Union[str, dict] = None, data_format: str = None, experiment_name: str = 'api_experiment', @@ -360,172 +368,184 @@ def train( description_fn, training_stats_fn, model_dir = get_file_names( output_directory) - # save description - if is_on_master(): - description = get_experiment_description( - self.config, - dataset=dataset, - training_set=training_set, - validation_set=validation_set, - test_set=test_set, - training_set_metadata=training_set_metadata, - data_format=data_format, - random_seed=random_seed - ) - if not skip_save_training_description: - save_json(description_fn, description) - # print description - logger.info('Experiment name: {}'.format(experiment_name)) - logger.info('Model name: {}'.format(model_name)) - logger.info('Output directory: {}'.format(output_directory)) - logger.info('\n') - for key, value in description.items(): - logger.info('{}: {}'.format(key, pformat(value, indent=4))) - logger.info('\n') + with self.backend.create_cache_dir(): + if isinstance(training_set, Dataset) and training_set_metadata is not None: + preprocessed_data = (training_set, validation_set, test_set, training_set_metadata) + else: + # save description + if is_on_master(): + description = get_experiment_description( + self.config, + dataset=dataset, + training_set=training_set, + validation_set=validation_set, + test_set=test_set, + training_set_metadata=training_set_metadata, + data_format=data_format, + random_seed=random_seed + ) + if not skip_save_training_description: + save_json(description_fn, description) + # print description + logger.info('Experiment name: {}'.format(experiment_name)) + logger.info('Model name: {}'.format(model_name)) + logger.info('Output directory: {}'.format(output_directory)) + logger.info('\n') + for key, value in description.items(): + logger.info('{}: {}'.format(key, pformat(value, indent=4))) + logger.info('\n') + + preprocessed_data = self.preprocess( + dataset=dataset, + training_set=training_set, + validation_set=validation_set, + test_set=test_set, + training_set_metadata=training_set_metadata, + data_format=data_format, + experiment_name=experiment_name, + model_name=model_name, + model_resume_path=model_resume_path, + skip_save_training_description=skip_save_training_description, + skip_save_training_statistics=skip_save_training_statistics, + skip_save_model=skip_save_model, + skip_save_progress=skip_save_progress, + skip_save_log=skip_save_log, + skip_save_processed_input=skip_save_processed_input, + output_directory=output_directory, + random_seed=random_seed, + devbug=debug, + **kwargs, + ) + (training_set, + validation_set, + test_set, + training_set_metadata) = preprocessed_data - # preprocess - preprocessed_data = preprocess_for_training( - self.config, - dataset=dataset, - training_set=training_set, - validation_set=validation_set, - test_set=test_set, - training_set_metadata=training_set_metadata, - data_format=data_format, - skip_save_processed_input=skip_save_processed_input, - preprocessing_params=self.config[PREPROCESSING], - random_seed=random_seed - ) + self.training_set_metadata = training_set_metadata - (training_set, - validation_set, - test_set, - training_set_metadata) = preprocessed_data - self.training_set_metadata = training_set_metadata + if is_on_master(): + logger.info('Training set: {0}'.format(len(training_set))) + if validation_set is not None: + logger.info('Validation set: {0}'.format(len(validation_set))) + if test_set is not None: + logger.info('Test set: {0}'.format(len(test_set))) - if is_on_master(): - logger.info('Training set: {0}'.format(training_set.size)) - if validation_set is not None: - logger.info('Validation set: {0}'.format(validation_set.size)) - if test_set is not None: - logger.info('Test set: {0}'.format(test_set.size)) + if is_on_master(): + if not skip_save_model: + # save train set metadata + os.makedirs(model_dir, exist_ok=True) + save_json( + os.path.join( + model_dir, + TRAIN_SET_METADATA_FILE_NAME + ), + training_set_metadata + ) - if is_on_master(): - if not skip_save_model: - # save train set metadata - os.makedirs(model_dir, exist_ok=True) - save_json( - os.path.join( - model_dir, - TRAIN_SET_METADATA_FILE_NAME - ), + contrib_command("train_init", experiment_directory=output_directory, + experiment_name=experiment_name, model_name=model_name, + output_directory=output_directory, + resume=model_resume_path is not None) + + # Build model if not provided + # if it was provided it means it was already loaded + if not self.model: + if is_on_master(): + print_boxed('MODEL', print_fun=logger.debug) + # update config with metadata properties + update_config_with_metadata( + self.config, training_set_metadata ) + self.model = LudwigModel.create_model(self.config, + random_seed=random_seed) - contrib_command("train_init", experiment_directory=output_directory, - experiment_name=experiment_name, model_name=model_name, - output_directory=output_directory, - resume=model_resume_path is not None) - - # Build model if not provided - # if it was provided it means it was already loaded - if not self.model: - if is_on_master(): - print_boxed('MODEL', print_fun=logger.debug) - # update config with metadata properties - update_config_with_metadata( - self.config, - training_set_metadata + # init trainer + trainer = Trainer( + **self.config[TRAINING], + resume=model_resume_path is not None, + skip_save_model=skip_save_model, + skip_save_progress=skip_save_progress, + skip_save_log=skip_save_log, + random_seed=random_seed, + horoovd=self._horovod, + debug=debug ) - self.model = LudwigModel.create_model(self.config, - random_seed=random_seed) - # init trainer - trainer = Trainer( - **self.config[TRAINING], - resume=model_resume_path is not None, - skip_save_model=skip_save_model, - skip_save_progress=skip_save_progress, - skip_save_log=skip_save_log, - random_seed=random_seed, - horoovd=self._horovod, - debug=debug - ) - - contrib_command("train_model", self.model, self.config, - self.config_fp) + contrib_command("train_model", self.model, self.config, + self.config_fp) - # train model - if is_on_master(): - print_boxed('TRAINING') - if not skip_save_model: - self.save_config(model_dir) - - train_stats = trainer.train( - self.model, - training_set, - validation_set=validation_set, - test_set=test_set, - save_path=model_dir, - ) - - train_trainset_stats, train_valiset_stats, train_testset_stats = train_stats - train_stats = { - TRAINING: train_trainset_stats, - VALIDATION: train_valiset_stats, - TEST: train_testset_stats - } + # train model + if is_on_master(): + print_boxed('TRAINING') + if not skip_save_model: + self.save_config(model_dir) - # save training statistics - if is_on_master(): - if not skip_save_training_statistics: - save_json(training_stats_fn, train_stats) - - # grab the results of the model with highest validation test performance - validation_field = trainer.validation_field - validation_metric = trainer.validation_metric - validation_field_result = train_valiset_stats[validation_field] - - best_function = get_best_function(validation_metric) - # results of the model with highest validation test performance - if is_on_master() and validation_set is not None: - epoch_best_vali_metric, best_vali_metric = best_function( - enumerate(validation_field_result[validation_metric]), - key=lambda pair: pair[1] - ) - logger.info( - 'Best validation model epoch: {0}'.format( - epoch_best_vali_metric + 1) + train_stats = trainer.train( + self.model, + training_set, + validation_set=validation_set, + test_set=test_set, + save_path=model_dir, ) - logger.info( - 'Best validation model {0} on validation set {1}: {2}'.format( - validation_metric, validation_field, best_vali_metric - )) - if test_set is not None: - best_vali_metric_epoch_test_metric = train_testset_stats[ - validation_field][validation_metric][ - epoch_best_vali_metric] + train_trainset_stats, train_valiset_stats, train_testset_stats = train_stats + train_stats = { + TRAINING: train_trainset_stats, + VALIDATION: train_valiset_stats, + TEST: train_testset_stats + } + + # save training statistics + if is_on_master(): + if not skip_save_training_statistics: + save_json(training_stats_fn, train_stats) + + # grab the results of the model with highest validation test performance + validation_field = trainer.validation_field + validation_metric = trainer.validation_metric + validation_field_result = train_valiset_stats[validation_field] + + best_function = get_best_function(validation_metric) + # results of the model with highest validation test performance + if is_on_master() and validation_set is not None: + epoch_best_vali_metric, best_vali_metric = best_function( + enumerate(validation_field_result[validation_metric]), + key=lambda pair: pair[1] + ) logger.info( - 'Best validation model {0} on test set {1}: {2}'.format( - validation_metric, - validation_field, - best_vali_metric_epoch_test_metric - ) + 'Best validation model epoch: {0}'.format( + epoch_best_vali_metric + 1) ) - logger.info( - '\nFinished: {0}_{1}'.format(experiment_name, model_name)) - logger.info('Saved to: {0}'.format(output_directory)) + logger.info( + 'Best validation model {0} on validation set {1}: {2}'.format( + validation_metric, validation_field, best_vali_metric + )) + if test_set is not None: + best_vali_metric_epoch_test_metric = train_testset_stats[ + validation_field][validation_metric][ + epoch_best_vali_metric] + + logger.info( + 'Best validation model {0} on test set {1}: {2}'.format( + validation_metric, + validation_field, + best_vali_metric_epoch_test_metric + ) + ) + logger.info( + '\nFinished: {0}_{1}'.format(experiment_name, model_name)) + logger.info('Saved to: {0}'.format(output_directory)) - contrib_command("train_save", output_directory) + contrib_command("train_save", output_directory) - self.training_set_metadata = training_set_metadata + self.training_set_metadata = training_set_metadata - if not skip_save_model: - # Load the best weights from saved checkpoint - self.load_weights(model_dir) + if not skip_save_model: + # Load the best weights from saved checkpoint + self.load_weights(model_dir) - return train_stats, preprocessed_data, output_directory + return train_stats, preprocessed_data, output_directory def train_online( self, @@ -1239,6 +1259,7 @@ def preprocess( data_format=data_format, skip_save_processed_input=skip_save_processed_input, preprocessing_params=self.config[PREPROCESSING], + backend=self.backend, random_seed=random_seed ) @@ -1521,6 +1542,7 @@ def kfold_cross_validate( gpus: Union[str, int, List[int]] = None, gpu_memory_limit: int = None, allow_parallel_threads: bool = True, + backend: Union[Backend, str] = LOCAL_BACKEND, use_horovod: bool = None, logging_level: int = logging.INFO, debug: bool = False, @@ -1595,6 +1617,8 @@ def kfold_cross_validate( :param allow_parallel_threads: (bool, default: `True`) allow TensorFlow to use multithreading parallelism to improve performance at the cost of determinism. + :param backend: (Union[Backend, str]) `Backend` or string name + of backend to use to execute preprocessing / training steps. :param use_horovod: (bool, default: `None`) flag for using horovod :param debug: (bool, default: `False`) If `True` turns on tfdbg with `inf_or_nan` checks. @@ -1645,7 +1669,7 @@ def kfold_cross_validate( elif data_format in CACHEABLE_FORMATS: data_reader = get_from_registry(data_format, external_data_reader_registry) - data_df = data_reader(dataset) + data_df = data_reader(dataset, backend.df_engine.df_lib) data_dir = os.path.dirname(dataset) else: ValueError( @@ -1673,6 +1697,7 @@ def kfold_cross_validate( model = LudwigModel( config=config, logging_level=logging_level, + backend=backend, use_horovod=use_horovod, gpus=gpus, gpu_memory_limit=gpu_memory_limit, diff --git a/ludwig/backend/__init__.py b/ludwig/backend/__init__.py new file mode 100644 index 00000000000..ddb5c685ee3 --- /dev/null +++ b/ludwig/backend/__init__.py @@ -0,0 +1,35 @@ +#! /usr/bin/env python +# coding=utf-8 +# Copyright (c) 2020 Uber Technologies, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== + +from ludwig.backend.base import Backend, LocalBackend + + +LOCAL_BACKEND = LocalBackend() + + +def get_local_backend(): + return LOCAL_BACKEND + + +backend_registry = { + 'local': get_local_backend, + None: get_local_backend, +} + + +def create_backend(name): + return backend_registry[name]() diff --git a/ludwig/backend/base.py b/ludwig/backend/base.py new file mode 100644 index 00000000000..a76af2a6ecb --- /dev/null +++ b/ludwig/backend/base.py @@ -0,0 +1,88 @@ +#! /usr/bin/env python +# coding=utf-8 +# Copyright (c) 2020 Uber Technologies, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== + +import os +import tempfile +import uuid + +from abc import ABC, abstractmethod +from contextlib import contextmanager + +from ludwig.data.dataframe.pandas import PandasEngine + + +class Backend(ABC): + def __init__(self, cache_dir=None): + self._cache_dir = cache_dir + + @property + @abstractmethod + def df_engine(self): + raise NotImplementedError() + + @property + @abstractmethod + def supports_multiprocessing(self): + raise NotImplementedError() + + @abstractmethod + def check_lazy_load_supported(self, feature): + raise NotImplementedError() + + @property + def cache_enabled(self): + return self._cache_dir is not None + + def create_cache_entry(self): + return os.path.join(self.cache_dir, str(uuid.uuid1())) + + @property + def cache_dir(self): + if not self._cache_dir: + raise ValueError('Cache directory not available, try calling `with backend.create_cache_dir()`.') + return self._cache_dir + + @contextmanager + def create_cache_dir(self): + prev_cache_dir = self._cache_dir + try: + if self._cache_dir: + os.makedirs(self._cache_dir, exist_ok=True) + yield self._cache_dir + else: + with tempfile.TemporaryDirectory() as tmpdir: + self._cache_dir = tmpdir + yield tmpdir + finally: + self._cache_dir = prev_cache_dir + + +class LocalBackend(Backend): + def __init__(self): + super().__init__() + self._df_engine = PandasEngine() + + @property + def df_engine(self): + return self._df_engine + + @property + def supports_multiprocessing(self): + return True + + def check_lazy_load_supported(self, feature): + pass diff --git a/ludwig/data/concatenate_datasets.py b/ludwig/data/concatenate_datasets.py index 8b5d7c27859..81d804f5db4 100644 --- a/ludwig/data/concatenate_datasets.py +++ b/ludwig/data/concatenate_datasets.py @@ -18,53 +18,66 @@ import logging import numpy as np -import pandas as pd +from ludwig.backend import LOCAL_BACKEND +from ludwig.constants import SPLIT from ludwig.utils.data_utils import read_csv logger = logging.getLogger(__name__) -def concatenate(train_csv, vali_csv, test_csv, output_csv): - concatenated_df = concatenate_datasets(train_csv, vali_csv, test_csv) +def concatenate_csv(train_csv, vali_csv, test_csv, output_csv): + concatenated_df = concatenate_files( + train_csv, vali_csv, test_csv, read_csv, LOCAL_BACKEND + ) logger.info('Saving concatenated dataset as csv..') concatenated_df.to_csv(output_csv, encoding='utf-8', index=False) logger.info('done') -def concatenate_datasets(training_set, Validation_set, test_set, - read_fn=read_csv): - logger.info('Loading training dataset...') - train_df = read_fn(training_set) +def concatenate_files(train_fname, vali_fname, test_fname, read_fn, backend): + df_lib = backend.df_engine.df_lib + + logger.info('Loading training file...') + train_df = read_fn(train_fname, df_lib) logger.info('done') - logger.info('Loading validation dataset..') - vali_df = read_fn(Validation_set) if Validation_set is not None else None + logger.info('Loading validation file..') + vali_df = read_fn(vali_fname, df_lib) if vali_fname is not None else None logger.info('done') - logger.info('Loading test dataset..') - test_df = read_fn(test_set) if test_set is not None else None + logger.info('Loading test file..') + test_df = read_fn(test_fname, df_lib) if test_fname is not None else None logger.info('done') - logger.info('Concatenating datasets..') - concatenated_df = concatenate_df(train_df, vali_df, test_df) + logger.info('Concatenating files..') + concatenated_df = concatenate_df(train_df, vali_df, test_df, backend) logger.info('done') return concatenated_df -def concatenate_df(train_df, vali_df, test_df): +def concatenate_df(train_df, vali_df, test_df, backend): train_size = len(train_df) vali_size = len(vali_df) if vali_df is not None else 0 - test_size = len(test_df) if test_df is not None else 0 - concatenated_df = pd.concat([train_df, vali_df, test_df], - ignore_index=True) - split = np.array( - [0] * train_size + [1] * vali_size + [2] * test_size, - dtype=np.int8 + + concatenated_df = backend.df_engine.df_lib.concat( + [df for df in [train_df, vali_df, test_df] if df is not None], + ignore_index=True ) - concatenated_df = concatenated_df.assign(split=pd.Series(split).values) + + def get_split(idx): + if idx < train_size: + return 0 + if idx < train_size + vali_size: + return 1 + return 2 + + concatenated_df[SPLIT] = concatenated_df.index.to_series().map( + get_split + ).astype(np.int8) + return concatenated_df @@ -92,4 +105,4 @@ def concatenate_df(train_df, vali_df, test_df): parser.add_argument('-o', '--output_csv', help='output csv') args = parser.parse_args() - concatenate(args.train_csv, args.vali_csv, args.test_csv, args.output_csv) + concatenate_csv(args.train_csv, args.vali_csv, args.test_csv, args.output_csv) diff --git a/ludwig/data/dataframe/__init__.py b/ludwig/data/dataframe/__init__.py new file mode 100644 index 00000000000..1eaad157e61 --- /dev/null +++ b/ludwig/data/dataframe/__init__.py @@ -0,0 +1,16 @@ +#! /usr/bin/env python +# coding=utf-8 +# Copyright (c) 2020 Uber Technologies, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== diff --git a/ludwig/data/dataframe/base.py b/ludwig/data/dataframe/base.py new file mode 100644 index 00000000000..63dcf65b675 --- /dev/null +++ b/ludwig/data/dataframe/base.py @@ -0,0 +1,67 @@ +#! /usr/bin/env python +# coding=utf-8 +# Copyright (c) 2020 Uber Technologies, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== + +from abc import ABC, abstractmethod + + +class DataFrameEngine(ABC): + @abstractmethod + def empty_df_like(self, df): + raise NotImplementedError() + + @abstractmethod + def parallelize(self, data): + raise NotImplementedError() + + @abstractmethod + def persist(self, data): + raise NotImplementedError() + + @abstractmethod + def compute(self, data): + raise NotImplementedError() + + @abstractmethod + def from_pandas(self, df): + raise NotImplementedError() + + @abstractmethod + def map_objects(self, series, map_fn): + raise NotImplementedError() + + @abstractmethod + def reduce_objects(self, series, reduce_fn): + raise NotImplementedError() + + @abstractmethod + def create_dataset(self, dataset, tag, config, training_set_metadata): + raise NotImplementedError() + + @property + @abstractmethod + def array_lib(self): + raise NotImplementedError() + + @property + @abstractmethod + def df_lib(self): + raise NotImplementedError() + + @property + @abstractmethod + def use_hdf5_cache(self): + raise NotImplementedError() diff --git a/ludwig/data/dataframe/pandas.py b/ludwig/data/dataframe/pandas.py new file mode 100644 index 00000000000..ff10f13261f --- /dev/null +++ b/ludwig/data/dataframe/pandas.py @@ -0,0 +1,69 @@ +#! /usr/bin/env python +# coding=utf-8 +# Copyright (c) 2020 Uber Technologies, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== + +import numpy as np +import pandas as pd + +from ludwig.data.dataset import Dataset +from ludwig.data.dataframe.base import DataFrameEngine +from ludwig.utils.data_utils import DATA_TRAIN_HDF5_FP +from ludwig.utils.misc_utils import get_proc_features + + +class PandasEngine(DataFrameEngine): + def empty_df_like(self, df): + return pd.DataFrame(index=df.index) + + def parallelize(self, data): + return data + + def persist(self, data): + return data + + def compute(self, data): + return data + + def from_pandas(self, df): + return df + + def map_objects(self, series, map_fn): + return series.map(map_fn) + + def reduce_objects(self, series, reduce_fn): + return reduce_fn(series) + + def create_dataset(self, dataset, tag, config, training_set_metadata): + return Dataset( + dataset, + get_proc_features(config), + training_set_metadata.get(DATA_TRAIN_HDF5_FP) + ) + + @property + def array_lib(self): + return np + + @property + def df_lib(self): + return pd + + @property + def use_hdf5_cache(self): + return True + + +PANDAS = PandasEngine() diff --git a/ludwig/data/dataset.py b/ludwig/data/dataset.py index 53286cf37f7..7139237629c 100644 --- a/ludwig/data/dataset.py +++ b/ludwig/data/dataset.py @@ -17,26 +17,18 @@ import h5py import numpy as np -from ludwig.constants import PREPROCESSING, PROC_COLUMN +from ludwig.constants import PREPROCESSING +from ludwig.data.sampler import DistributedSampler +from ludwig.utils.batcher import Batcher +from ludwig.utils.data_utils import to_numpy_dataset class Dataset: - def __init__(self, dataset, input_features, output_features, data_hdf5_fp): - self.dataset = dataset - - self.size = min(map(len, self.dataset.values())) - - self.input_features = {} - for feature in input_features: - proc_column = feature[PROC_COLUMN] - self.input_features[proc_column] = feature - self.output_features = {} - for feature in output_features: - proc_column = feature[PROC_COLUMN] - self.output_features[proc_column] = feature - self.features = self.input_features.copy() - self.features.update(self.output_features) + def __init__(self, dataset, features, data_hdf5_fp): + self.features = features self.data_hdf5_fp = data_hdf5_fp + self.size = len(dataset) + self.dataset = to_numpy_dataset(dataset) def get(self, proc_column, idx=None): if idx is None: @@ -67,3 +59,18 @@ def get_dataset(self): def __len__(self): return self.size + + def initialize_batcher(self, batch_size=128, + should_shuffle=True, + seed=0, + ignore_last=False, + horovod=None): + sampler = DistributedSampler(len(self), + shuffle=should_shuffle, + seed=seed, + horovod=horovod) + batcher = Batcher(self, + sampler, + batch_size=batch_size, + ignore_last=ignore_last) + return batcher diff --git a/ludwig/data/preprocessing.py b/ludwig/data/preprocessing.py index 8600d21ecc0..5a651efbcac 100644 --- a/ludwig/data/preprocessing.py +++ b/ludwig/data/preprocessing.py @@ -17,23 +17,24 @@ import logging import os from abc import ABC, abstractmethod +from copy import copy -import h5py +import ludwig import numpy as np import pandas as pd - -import ludwig +from ludwig.backend import LOCAL_BACKEND from ludwig.constants import * from ludwig.constants import TEXT -from ludwig.data.concatenate_datasets import concatenate_datasets, \ - concatenate_df +from ludwig.data.concatenate_datasets import concatenate_files, concatenate_df from ludwig.data.dataset import Dataset from ludwig.features.feature_registries import (base_type_registry, input_type_registry) from ludwig.features.feature_utils import compute_feature_hash from ludwig.utils import data_utils from ludwig.utils.data_utils import (CACHEABLE_FORMATS, CSV_FORMATS, - DATA_TRAIN_HDF5_FP, DATAFRAME_FORMATS, + DATA_PROCESSED_CACHE_DIR, + DATA_TRAIN_HDF5_FP, + DATAFRAME_FORMATS, DICT_FORMATS, EXCEL_FORMATS, FEATHER_FORMATS, FWF_FORMATS, HDF5_FORMATS, HTML_FORMATS, JSON_FORMATS, @@ -47,15 +48,14 @@ read_html, read_json, read_jsonl, read_orc, read_parquet, read_pickle, read_sas, read_spss, read_stata, read_tsv, - replace_file_extension, split_dataset_ttv, - text_feature_data_field) + replace_file_extension, split_dataset_ttv) from ludwig.utils.data_utils import save_array, get_split_path from ludwig.utils.defaults import (default_preprocessing_parameters, default_random_seed) from ludwig.utils.horovod_utils import is_on_master from ludwig.utils.misc_utils import (get_from_registry, merge_dict, resolve_pointers, set_random_seed, - hash_dict) + hash_dict, get_proc_features_from_lists) logger = logging.getLogger(__name__) @@ -73,6 +73,7 @@ def preprocess_for_training( training_set_metadata=None, skip_save_processed_input=False, preprocessing_params=default_preprocessing_parameters, + backend=LOCAL_BACKEND, random_seed=default_random_seed ): pass @@ -83,7 +84,8 @@ def preprocess_for_prediction( dataset, features, preprocessing_params, - training_set_metadata + training_set_metadata, + backend ): pass @@ -99,6 +101,7 @@ def preprocess_for_training( training_set_metadata=None, skip_save_processed_input=False, preprocessing_params=default_preprocessing_parameters, + backend=LOCAL_BACKEND, random_seed=default_random_seed ): num_overrides = override_in_memory_flag(features, True) @@ -108,14 +111,15 @@ def preprocess_for_training( 'with {} data format.'.format('dict') ) + df_engine = backend.df_engine if dataset is not None: - dataset = pd.DataFrame(dataset) + dataset = df_engine.from_pandas(pd.DataFrame(dataset)) if training_set_metadata is not None: - training_set = pd.DataFrame(training_set) + training_set = df_engine.from_pandas(pd.DataFrame(training_set)) if validation_set is not None: - validation_set = pd.DataFrame(validation_set) + validation_set = df_engine.from_pandas(pd.DataFrame(validation_set)) if test_set is not None: - test_set = pd.DataFrame(test_set) + test_set = df_engine.from_pandas(pd.DataFrame(test_set)) return _preprocess_df_for_training( features, @@ -125,6 +129,7 @@ def preprocess_for_training( test_set, training_set_metadata=training_set_metadata, preprocessing_params=preprocessing_params, + backend=backend, random_seed=random_seed ) @@ -133,13 +138,15 @@ def preprocess_for_prediction( dataset, features, preprocessing_params, - training_set_metadata + training_set_metadata, + backend ): dataset, training_set_metadata = build_dataset( pd.DataFrame(dataset), features, preprocessing_params, - metadata=training_set_metadata + metadata=training_set_metadata, + backend=backend ) return dataset, training_set_metadata, None @@ -155,6 +162,7 @@ def preprocess_for_training( training_set_metadata=None, skip_save_processed_input=False, preprocessing_params=default_preprocessing_parameters, + backend=LOCAL_BACKEND, random_seed=default_random_seed ): num_overrides = override_in_memory_flag(features, True) @@ -172,6 +180,7 @@ def preprocess_for_training( test_set, training_set_metadata=training_set_metadata, preprocessing_params=preprocessing_params, + backend=backend, random_seed=random_seed ) @@ -180,13 +189,15 @@ def preprocess_for_prediction( dataset, features, preprocessing_params, - training_set_metadata + training_set_metadata, + backend ): dataset, training_set_metadata = build_dataset( dataset, features, preprocessing_params, - metadata=training_set_metadata + metadata=training_set_metadata, + backend=backend ) return dataset, training_set_metadata, None @@ -202,6 +213,7 @@ def preprocess_for_training( training_set_metadata=None, skip_save_processed_input=False, preprocessing_params=default_preprocessing_parameters, + backend=LOCAL_BACKEND, random_seed=default_random_seed ): return _preprocess_file_for_training( @@ -214,6 +226,7 @@ def preprocess_for_training( training_set_metadata=training_set_metadata, skip_save_processed_input=skip_save_processed_input, preprocessing_params=preprocessing_params, + backend=backend, random_seed=random_seed ) @@ -222,7 +235,8 @@ def preprocess_for_prediction( dataset, features, preprocessing_params, - training_set_metadata + training_set_metadata, + backend ): dataset_df = read_csv(dataset) dataset_df.src = dataset @@ -230,7 +244,8 @@ def preprocess_for_prediction( dataset_df, features, preprocessing_params, - metadata=training_set_metadata + metadata=training_set_metadata, + backend=backend ) return dataset, training_set_metadata, None @@ -246,6 +261,7 @@ def preprocess_for_training( training_set_metadata=None, skip_save_processed_input=False, preprocessing_params=default_preprocessing_parameters, + backend=LOCAL_BACKEND, random_seed=default_random_seed ): return _preprocess_file_for_training( @@ -258,6 +274,7 @@ def preprocess_for_training( training_set_metadata=training_set_metadata, skip_save_processed_input=skip_save_processed_input, preprocessing_params=preprocessing_params, + backend=backend, random_seed=random_seed ) @@ -266,7 +283,8 @@ def preprocess_for_prediction( dataset, features, preprocessing_params, - training_set_metadata + training_set_metadata, + backend ): dataset_df = read_tsv(dataset) dataset_df.src = dataset @@ -274,7 +292,8 @@ def preprocess_for_prediction( dataset_df, features, preprocessing_params, - metadata=training_set_metadata + metadata=training_set_metadata, + backend=backend ) return dataset, training_set_metadata, None @@ -290,6 +309,7 @@ def preprocess_for_training( training_set_metadata=None, skip_save_processed_input=False, preprocessing_params=default_preprocessing_parameters, + backend=LOCAL_BACKEND, random_seed=default_random_seed ): return _preprocess_file_for_training( @@ -302,6 +322,7 @@ def preprocess_for_training( training_set_metadata=training_set_metadata, skip_save_processed_input=skip_save_processed_input, preprocessing_params=preprocessing_params, + backend=backend, random_seed=random_seed ) @@ -310,15 +331,17 @@ def preprocess_for_prediction( dataset, features, preprocessing_params, - training_set_metadata + training_set_metadata, + backend ): - dataset_df = read_json(dataset) + dataset_df = read_json(dataset, backend.df_engine.df_lib) dataset_df.src = dataset dataset, training_set_metadata = build_dataset( dataset_df, features, preprocessing_params, - metadata=training_set_metadata + metadata=training_set_metadata, + backend=backend ) return dataset, training_set_metadata, None @@ -334,6 +357,7 @@ def preprocess_for_training( training_set_metadata=None, skip_save_processed_input=False, preprocessing_params=default_preprocessing_parameters, + backend=LOCAL_BACKEND, random_seed=default_random_seed ): return _preprocess_file_for_training( @@ -346,6 +370,7 @@ def preprocess_for_training( training_set_metadata=training_set_metadata, skip_save_processed_input=skip_save_processed_input, preprocessing_params=preprocessing_params, + backend=backend, random_seed=random_seed ) @@ -354,15 +379,17 @@ def preprocess_for_prediction( dataset, features, preprocessing_params, - training_set_metadata + training_set_metadata, + backend ): - dataset_df = read_jsonl(dataset) + dataset_df = read_jsonl(dataset, backend.df_engine.df_lib) dataset_df.src = dataset dataset, training_set_metadata = build_dataset( dataset_df, features, preprocessing_params, - metadata=training_set_metadata + metadata=training_set_metadata, + backend=backend ) return dataset, training_set_metadata, None @@ -378,6 +405,7 @@ def preprocess_for_training( training_set_metadata=None, skip_save_processed_input=False, preprocessing_params=default_preprocessing_parameters, + backend=LOCAL_BACKEND, random_seed=default_random_seed ): return _preprocess_file_for_training( @@ -390,6 +418,7 @@ def preprocess_for_training( training_set_metadata=training_set_metadata, skip_save_processed_input=skip_save_processed_input, preprocessing_params=preprocessing_params, + backend=backend, random_seed=random_seed ) @@ -398,15 +427,17 @@ def preprocess_for_prediction( dataset, features, preprocessing_params, - training_set_metadata + training_set_metadata, + backend ): - dataset_df = read_excel(dataset) + dataset_df = read_excel(dataset, backend.df_engine.df_lib) dataset_df.src = dataset dataset, training_set_metadata = build_dataset( dataset_df, features, preprocessing_params, - metadata=training_set_metadata + metadata=training_set_metadata, + backend=backend ) return dataset, training_set_metadata, None @@ -422,6 +453,7 @@ def preprocess_for_training( training_set_metadata=None, skip_save_processed_input=False, preprocessing_params=default_preprocessing_parameters, + backend=LOCAL_BACKEND, random_seed=default_random_seed ): return _preprocess_file_for_training( @@ -434,6 +466,7 @@ def preprocess_for_training( training_set_metadata=training_set_metadata, skip_save_processed_input=skip_save_processed_input, preprocessing_params=preprocessing_params, + backend=backend, random_seed=random_seed ) @@ -442,15 +475,17 @@ def preprocess_for_prediction( dataset, features, preprocessing_params, - training_set_metadata + training_set_metadata, + backend ): - dataset_df = read_parquet(dataset) + dataset_df = read_parquet(dataset, backend.df_engine.df_lib) dataset_df.src = dataset dataset, training_set_metadata = build_dataset( dataset_df, features, preprocessing_params, - metadata=training_set_metadata + metadata=training_set_metadata, + backend=backend ) return dataset, training_set_metadata, None @@ -466,6 +501,7 @@ def preprocess_for_training( training_set_metadata=None, skip_save_processed_input=False, preprocessing_params=default_preprocessing_parameters, + backend=LOCAL_BACKEND, random_seed=default_random_seed ): return _preprocess_file_for_training( @@ -478,6 +514,7 @@ def preprocess_for_training( training_set_metadata=training_set_metadata, skip_save_processed_input=skip_save_processed_input, preprocessing_params=preprocessing_params, + backend=backend, random_seed=random_seed ) @@ -486,15 +523,17 @@ def preprocess_for_prediction( dataset, features, preprocessing_params, - training_set_metadata + training_set_metadata, + backend ): - dataset_df = read_pickle(dataset) + dataset_df = read_pickle(dataset, backend.df_engine.df_lib) dataset_df.src = dataset dataset, training_set_metadata = build_dataset( dataset_df, features, preprocessing_params, - metadata=training_set_metadata + metadata=training_set_metadata, + backend=backend ) return dataset, training_set_metadata, None @@ -510,6 +549,7 @@ def preprocess_for_training( training_set_metadata=None, skip_save_processed_input=False, preprocessing_params=default_preprocessing_parameters, + backend=LOCAL_BACKEND, random_seed=default_random_seed ): return _preprocess_file_for_training( @@ -522,6 +562,7 @@ def preprocess_for_training( training_set_metadata=training_set_metadata, skip_save_processed_input=skip_save_processed_input, preprocessing_params=preprocessing_params, + backend=backend, random_seed=random_seed ) @@ -530,15 +571,17 @@ def preprocess_for_prediction( dataset, features, preprocessing_params, - training_set_metadata + training_set_metadata, + backend ): - dataset_df = read_feather(dataset) + dataset_df = read_feather(dataset, backend.df_engine.df_lib) dataset_df.src = dataset dataset, training_set_metadata = build_dataset( dataset_df, features, preprocessing_params, - metadata=training_set_metadata + metadata=training_set_metadata, + backend=backend ) return dataset, training_set_metadata, None @@ -554,6 +597,7 @@ def preprocess_for_training( training_set_metadata=None, skip_save_processed_input=False, preprocessing_params=default_preprocessing_parameters, + backend=LOCAL_BACKEND, random_seed=default_random_seed ): return _preprocess_file_for_training( @@ -566,6 +610,7 @@ def preprocess_for_training( training_set_metadata=training_set_metadata, skip_save_processed_input=skip_save_processed_input, preprocessing_params=preprocessing_params, + backend=backend, random_seed=random_seed ) @@ -574,15 +619,17 @@ def preprocess_for_prediction( dataset, features, preprocessing_params, - training_set_metadata + training_set_metadata, + backend ): - dataset_df = read_fwf(dataset) + dataset_df = read_fwf(dataset, backend.df_engine.df_lib) dataset_df.src = dataset dataset, training_set_metadata = build_dataset( dataset_df, features, preprocessing_params, - metadata=training_set_metadata + metadata=training_set_metadata, + backend=backend ) return dataset, training_set_metadata, None @@ -598,6 +645,7 @@ def preprocess_for_training( training_set_metadata=None, skip_save_processed_input=False, preprocessing_params=default_preprocessing_parameters, + backend=LOCAL_BACKEND, random_seed=default_random_seed ): return _preprocess_file_for_training( @@ -610,6 +658,7 @@ def preprocess_for_training( training_set_metadata=training_set_metadata, skip_save_processed_input=skip_save_processed_input, preprocessing_params=preprocessing_params, + backend=backend, random_seed=random_seed ) @@ -618,15 +667,17 @@ def preprocess_for_prediction( dataset, features, preprocessing_params, - training_set_metadata + training_set_metadata, + backend ): - dataset_df = read_html(dataset) + dataset_df = read_html(dataset, backend.df_engine.df_lib) dataset_df.src = dataset dataset, training_set_metadata = build_dataset( dataset_df, features, preprocessing_params, - metadata=training_set_metadata + metadata=training_set_metadata, + backend=backend ) return dataset, training_set_metadata, None @@ -642,6 +693,7 @@ def preprocess_for_training( training_set_metadata=None, skip_save_processed_input=False, preprocessing_params=default_preprocessing_parameters, + backend=LOCAL_BACKEND, random_seed=default_random_seed ): return _preprocess_file_for_training( @@ -654,6 +706,7 @@ def preprocess_for_training( training_set_metadata=training_set_metadata, skip_save_processed_input=skip_save_processed_input, preprocessing_params=preprocessing_params, + backend=backend, random_seed=random_seed ) @@ -662,15 +715,17 @@ def preprocess_for_prediction( dataset, features, preprocessing_params, - training_set_metadata + training_set_metadata, + backend ): - dataset_df = read_orc(dataset) + dataset_df = read_orc(dataset, backend.df_engine.df_lib) dataset_df.src = dataset dataset, training_set_metadata = build_dataset( dataset_df, features, preprocessing_params, - metadata=training_set_metadata + metadata=training_set_metadata, + backend=backend ) return dataset, training_set_metadata, None @@ -686,6 +741,7 @@ def preprocess_for_training( training_set_metadata=None, skip_save_processed_input=False, preprocessing_params=default_preprocessing_parameters, + backend=LOCAL_BACKEND, random_seed=default_random_seed ): return _preprocess_file_for_training( @@ -698,6 +754,7 @@ def preprocess_for_training( training_set_metadata=training_set_metadata, skip_save_processed_input=skip_save_processed_input, preprocessing_params=preprocessing_params, + backend=backend, random_seed=random_seed ) @@ -706,15 +763,17 @@ def preprocess_for_prediction( dataset, features, preprocessing_params, - training_set_metadata + training_set_metadata, + backend ): - dataset_df = read_sas(dataset) + dataset_df = read_sas(dataset, backend.df_engine.df_lib) dataset_df.src = dataset dataset, training_set_metadata = build_dataset( dataset_df, features, preprocessing_params, - metadata=training_set_metadata + metadata=training_set_metadata, + backend=backend ) return dataset, training_set_metadata, None @@ -730,6 +789,7 @@ def preprocess_for_training( training_set_metadata=None, skip_save_processed_input=False, preprocessing_params=default_preprocessing_parameters, + backend=LOCAL_BACKEND, random_seed=default_random_seed ): return _preprocess_file_for_training( @@ -742,6 +802,7 @@ def preprocess_for_training( training_set_metadata=training_set_metadata, skip_save_processed_input=skip_save_processed_input, preprocessing_params=preprocessing_params, + backend=backend, random_seed=random_seed ) @@ -750,15 +811,17 @@ def preprocess_for_prediction( dataset, features, preprocessing_params, - training_set_metadata + training_set_metadata, + backend ): - dataset_df = read_spss(dataset) + dataset_df = read_spss(dataset, backend.df_engine.df_lib) dataset_df.src = dataset dataset, training_set_metadata = build_dataset( dataset_df, features, preprocessing_params, - metadata=training_set_metadata + metadata=training_set_metadata, + backend=backend ) return dataset, training_set_metadata, None @@ -774,6 +837,7 @@ def preprocess_for_training( training_set_metadata=None, skip_save_processed_input=False, preprocessing_params=default_preprocessing_parameters, + backend=LOCAL_BACKEND, random_seed=default_random_seed ): return _preprocess_file_for_training( @@ -786,6 +850,7 @@ def preprocess_for_training( training_set_metadata=training_set_metadata, skip_save_processed_input=skip_save_processed_input, preprocessing_params=preprocessing_params, + backend=backend, random_seed=random_seed ) @@ -794,15 +859,17 @@ def preprocess_for_prediction( dataset, features, preprocessing_params, - training_set_metadata + training_set_metadata, + backend ): - dataset_df = read_stata(dataset) + dataset_df = read_stata(dataset, backend.df_engine.df_lib) dataset_df.src = dataset dataset, training_set_metadata = build_dataset( dataset_df, features, preprocessing_params, - metadata=training_set_metadata + metadata=training_set_metadata, + backend=backend ) return dataset, training_set_metadata, None @@ -818,6 +885,7 @@ def preprocess_for_training( training_set_metadata=None, skip_save_processed_input=False, preprocessing_params=default_preprocessing_parameters, + backend=LOCAL_BACKEND, random_seed=default_random_seed ): if dataset is None and training_set is None: @@ -881,7 +949,8 @@ def preprocess_for_prediction( dataset, features, preprocessing_params, - training_set_metadata + training_set_metadata, + backend ): hdf5_fp = dataset dataset = load_hdf5( @@ -919,8 +988,12 @@ def build_dataset( features, global_preprocessing_parameters, metadata=None, + backend=LOCAL_BACKEND, random_seed=default_random_seed, ): + df_engine = backend.df_engine + dataset_df = df_engine.parallelize(dataset_df) + global_preprocessing_parameters = merge_dict( default_preprocessing_parameters, global_preprocessing_parameters @@ -930,13 +1003,15 @@ def build_dataset( metadata = build_metadata( dataset_df, features, - global_preprocessing_parameters + global_preprocessing_parameters, + backend ) dataset = build_data( dataset_df, features, - metadata + metadata, + backend ) dataset[SPLIT] = get_split( @@ -946,13 +1021,14 @@ def build_dataset( 'split_probabilities' ], stratify=global_preprocessing_parameters['stratify'], + backend=backend, random_seed=random_seed ) return dataset, metadata -def build_metadata(dataset_df, features, global_preprocessing_parameters): +def build_metadata(dataset_df, features, global_preprocessing_parameters, backend): metadata = {} proc_feature_to_metadata = {} @@ -999,7 +1075,7 @@ def build_metadata(dataset_df, features, global_preprocessing_parameters): **preprocessing_parameters } - handle_missing_values( + dataset_df = handle_missing_values( dataset_df, feature, preprocessing_parameters @@ -1012,7 +1088,8 @@ def build_metadata(dataset_df, features, global_preprocessing_parameters): metadata[feature[NAME]] = get_feature_meta( dataset_df[feature[NAME]].astype(str), - preprocessing_parameters + preprocessing_parameters, + backend ) metadata[feature[NAME]][PREPROCESSING] = preprocessing_parameters @@ -1020,19 +1097,19 @@ def build_metadata(dataset_df, features, global_preprocessing_parameters): return metadata -def build_data(dataset_df, features, training_set_metadata): - dataset = {} +def build_data(input_df, features, training_set_metadata, backend): + proc_df = backend.df_engine.empty_df_like(input_df) for feature in features: if PROC_COLUMN not in feature: feature[PROC_COLUMN] = compute_feature_hash(feature) - if feature[PROC_COLUMN] not in dataset: + if feature[PROC_COLUMN] not in proc_df: preprocessing_parameters = \ training_set_metadata[feature[NAME]][ PREPROCESSING] - handle_missing_values( - dataset_df, + input_df = handle_missing_values( + input_df, feature, preprocessing_parameters ) @@ -1040,15 +1117,16 @@ def build_data(dataset_df, features, training_set_metadata): feature[TYPE], base_type_registry ).add_feature_data - add_feature_data( + proc_df = add_feature_data( feature, - dataset_df, - dataset, + input_df, + proc_df, training_set_metadata, - preprocessing_parameters + preprocessing_parameters, + backend ) - return dataset + return proc_df def precompute_fill_value(dataset_df, feature, preprocessing_parameters): @@ -1084,35 +1162,39 @@ def handle_missing_values(dataset_df, feature, preprocessing_parameters): method=missing_value_strategy, ) elif missing_value_strategy == DROP_ROW: - dataset_df.dropna(subset=[feature[COLUMN]], inplace=True) + dataset_df = dataset_df.dropna(subset=[feature[COLUMN]]) else: raise ValueError('Invalid missing value strategy') + return dataset_df + def get_split( dataset_df, force_split=False, split_probabilities=(0.7, 0.1, 0.2), stratify=None, + backend=LOCAL_BACKEND, random_seed=default_random_seed, ): if SPLIT in dataset_df and not force_split: split = dataset_df[SPLIT] else: set_random_seed(random_seed) + if stratify is None or stratify not in dataset_df: - split = np.random.choice( - 3, - len(dataset_df), - p=split_probabilities, + split = dataset_df.index.to_series().map( + lambda x: np.random.choice(3, 1, p=split_probabilities) ).astype(np.int8) else: split = np.zeros(len(dataset_df)) for val in dataset_df[stratify].unique(): + # TODO dask: find a way to better parallelize this operation idx_list = ( dataset_df.index[dataset_df[stratify] == val].tolist() ) - val_list = np.random.choice( + array_lib = backend.df_engine.array_lib + val_list = array_lib.random.choice( 3, len(idx_list), p=split_probabilities, @@ -1128,30 +1210,20 @@ def load_hdf5( shuffle_training=False ): logger.info('Loading data from: {0}'.format(hdf5_file_path)) - # Load data from file - hdf5_data = h5py.File(hdf5_file_path, 'r') - dataset = {} - for feature in features: - if feature[TYPE] == TEXT: - text_data_field = text_feature_data_field(feature) - dataset[text_data_field] = hdf5_data[text_data_field][()] - else: - dataset[feature[PROC_COLUMN]] = hdf5_data[feature[PROC_COLUMN]][()] + def shuffle(df): + return df.sample(frac=1).reset_index(drop=True) + + dataset = data_utils.load_hdf5(hdf5_file_path) if not split_data: - if SPLIT in hdf5_data: - dataset[SPLIT] = hdf5_data[SPLIT][()] - hdf5_data.close() if shuffle_training: - dataset = data_utils.shuffle_dict_unison_inplace(dataset) + dataset = shuffle(dataset) return dataset - split = hdf5_data[SPLIT][()] - hdf5_data.close() - training_set, test_set, validation_set = split_dataset_ttv(dataset, split) + training_set, test_set, validation_set = split_dataset_ttv(dataset, SPLIT) if shuffle_training: - training_set = data_utils.shuffle_dict_unison_inplace(training_set) + training_set = shuffle(training_set) return training_set, test_set, validation_set @@ -1171,6 +1243,7 @@ def preprocess_for_training( data_format=None, skip_save_processed_input=False, preprocessing_params=default_preprocessing_parameters, + backend=LOCAL_BACKEND, random_seed=default_random_seed ): # sanity check to make sure some data source is provided @@ -1288,6 +1361,7 @@ def preprocess_for_training( training_set_metadata=training_set_metadata, skip_save_processed_input=skip_save_processed_input, preprocessing_params=preprocessing_params, + backend=backend, random_seed=random_seed ) training_set, test_set, validation_set, training_set_metadata = processed @@ -1300,29 +1374,30 @@ def preprocess_for_training( [training_set, validation_set, test_set] ) - training_dataset = Dataset( + df_engine = backend.df_engine + training_dataset = df_engine.create_dataset( training_set, - config['input_features'], - config['output_features'], - training_set_metadata.get(DATA_TRAIN_HDF5_FP) + TRAINING, + config, + training_set_metadata ) validation_dataset = None if validation_set is not None: - validation_dataset = Dataset( + validation_dataset = df_engine.create_dataset( validation_set, - config['input_features'], - config['output_features'], - training_set_metadata.get(DATA_TRAIN_HDF5_FP) + VALIDATION, + config, + training_set_metadata ) test_dataset = None if test_set is not None: - test_dataset = Dataset( + test_dataset = df_engine.create_dataset( test_set, - config['input_features'], - config['output_features'], - training_set_metadata.get(DATA_TRAIN_HDF5_FP) + TEST, + config, + training_set_metadata ) return ( @@ -1343,6 +1418,7 @@ def _preprocess_file_for_training( read_fn=read_csv, skip_save_processed_input=False, preprocessing_params=default_preprocessing_parameters, + backend=LOCAL_BACKEND, random_seed=default_random_seed ): """ @@ -1368,24 +1444,30 @@ def _preprocess_file_for_training( ) logger.info('Building dataset (it may take a while)') - dataset_df = read_fn(dataset) + dataset_df = read_fn(dataset, backend.df_engine.df_lib) dataset_df.src = dataset + data, training_set_metadata = build_dataset( dataset_df, features, preprocessing_params, metadata=training_set_metadata, + backend=backend, random_seed=random_seed ) - # save split values for use by visualization routines - split_fp = get_split_path(dataset) - save_array(split_fp, data[SPLIT]) + if backend.cache_enabled: + training_set_metadata[DATA_PROCESSED_CACHE_DIR] = backend.create_cache_entry() + + # TODO dask: consolidate hdf5 cache with backend cache + if is_on_master() and not skip_save_processed_input and backend.df_engine.use_hdf5_cache: + # save split values for use by visualization routines + split_fp = get_split_path(dataset) + save_array(split_fp, data[SPLIT]) - if is_on_master() and not skip_save_processed_input: logger.info('Writing preprocessed dataset cache') data_hdf5_fp = replace_file_extension(dataset, 'hdf5') - data_utils.save_hdf5(data_hdf5_fp, data, training_set_metadata) + data_utils.save_hdf5(data_hdf5_fp, data) logger.info('Writing train set metadata') training_set_metadata[DATA_TRAIN_HDF5_FP] = data_hdf5_fp @@ -1398,9 +1480,10 @@ def _preprocess_file_for_training( data_utils.save_json(training_set_metadata_fp, training_set_metadata) + # TODO dask: https://docs.dask.org/en/latest/dataframe-api.html#dask.dataframe.DataFrame.random_split training_data, test_data, validation_data = split_dataset_ttv( data, - data[SPLIT] + SPLIT ) elif training_set: @@ -1413,11 +1496,12 @@ def _preprocess_file_for_training( ) logger.info('Building dataset (it may take a while)') - concatenated_df = concatenate_datasets( + concatenated_df = concatenate_files( training_set, validation_set, test_set, - read_fn=read_fn + read_fn, + backend ) concatenated_df.src = training_set @@ -1426,21 +1510,24 @@ def _preprocess_file_for_training( features, preprocessing_params, metadata=training_set_metadata, + backend=backend, random_seed=random_seed ) + if backend.cache_enabled: + training_set_metadata[DATA_PROCESSED_CACHE_DIR] = backend.create_cache_entry() + training_data, test_data, validation_data = split_dataset_ttv( data, - data[SPLIT] + SPLIT ) - if is_on_master() and not skip_save_processed_input: + if is_on_master() and not skip_save_processed_input and backend.df_engine.use_hdf5_cache: logger.info('Writing preprocessed training set cache') data_train_hdf5_fp = replace_file_extension(training_set, 'hdf5') data_utils.save_hdf5( data_train_hdf5_fp, training_data, - training_set_metadata ) if validation_set is not None: @@ -1452,7 +1539,6 @@ def _preprocess_file_for_training( data_utils.save_hdf5( data_validation_hdf5_fp, validation_data, - training_set_metadata ) if test_set is not None: @@ -1464,7 +1550,6 @@ def _preprocess_file_for_training( data_utils.save_hdf5( data_test_hdf5_fp, test_data, - training_set_metadata ) logger.info('Writing train set metadata') @@ -1496,6 +1581,7 @@ def _preprocess_df_for_training( test_set=None, training_set_metadata=None, preprocessing_params=default_preprocessing_parameters, + backend=LOCAL_BACKEND, random_seed=default_random_seed ): """ Method to pre-process dataframes. This doesn't have the option to save the @@ -1514,7 +1600,8 @@ def _preprocess_df_for_training( dataset = concatenate_df( training_set, validation_set, - test_set + test_set, + backend ) dataset, training_set_metadata = build_dataset( @@ -1522,11 +1609,16 @@ def _preprocess_df_for_training( features, preprocessing_params, metadata=training_set_metadata, - random_seed=random_seed + random_seed=random_seed, + backend=backend ) + + if backend.cache_enabled: + training_set_metadata[DATA_PROCESSED_CACHE_DIR] = backend.create_cache_entry() + training_set, test_set, validation_set = split_dataset_ttv( dataset, - dataset[SPLIT] + SPLIT ) return training_set, test_set, validation_set, training_set_metadata @@ -1537,7 +1629,8 @@ def preprocess_for_prediction( training_set_metadata=None, data_format=None, split=FULL, - include_outputs=True + include_outputs=True, + backend=LOCAL_BACKEND ): """Preprocesses the dataset to parse it into a format that is usable by the Ludwig core @@ -1612,7 +1705,8 @@ def preprocess_for_prediction( dataset, features, preprocessing_params, - training_set_metadata + training_set_metadata, + backend ) dataset, training_set_metadata, new_hdf5_fp = processed if new_hdf5_fp: @@ -1623,7 +1717,7 @@ def preprocess_for_prediction( if split != FULL: training_set, test_set, validation_set = split_dataset_ttv( dataset, - dataset[SPLIT] + SPLIT ) if split == TRAINING: dataset = training_set @@ -1632,10 +1726,15 @@ def preprocess_for_prediction( elif split == TEST: dataset = test_set + features = get_proc_features_from_lists( + config['input_features'], + output_features + ) + + # TODO dask: support postprocessing using Backend dataset = Dataset( dataset, - config['input_features'], - output_features, + features, hdf5_fp ) @@ -1670,6 +1769,7 @@ def calculate_checksum(original_dataset, config): config.get('output_features', []) + \ config.get('features', []) info['feature_names'] = [feature[NAME] for feature in features] + info['feature_types'] = [feature[TYPE] for feature in features] info['feature_preprocessing'] = [feature.get(PREPROCESSING, {}) for feature in features] hash = hash_dict(info, max_length=None) diff --git a/ludwig/features/audio_feature.py b/ludwig/features/audio_feature.py index e005b238341..9ba2b93677d 100644 --- a/ludwig/features/audio_feature.py +++ b/ludwig/features/audio_feature.py @@ -26,8 +26,7 @@ from ludwig.encoders.sequence_encoders import StackedCNN, ParallelCNN, \ StackedParallelCNN, StackedRNN, SequencePassthroughEncoder, StackedCNNRNN from ludwig.features.sequence_feature import SequenceInputFeature -from ludwig.utils.audio_utils import calculate_incr_mean -from ludwig.utils.audio_utils import calculate_incr_var +from ludwig.utils.audio_utils import calculate_mean, calculate_var from ludwig.utils.audio_utils import get_fbank from ludwig.utils.audio_utils import get_group_delay from ludwig.utils.audio_utils import get_length_in_samp @@ -57,7 +56,7 @@ class AudioFeatureMixin(object): } @staticmethod - def get_feature_meta(column, preprocessing_parameters): + def get_feature_meta(column, preprocessing_parameters, backend): try: import soundfile except ImportError: @@ -69,7 +68,7 @@ def get_feature_meta(column, preprocessing_parameters): sys.exit(-1) audio_feature_dict = preprocessing_parameters['audio_feature'] - first_audio_file_path = column[0] + first_audio_file_path = column.head(1)[0] _, sampling_rate_in_hz = soundfile.read(first_audio_file_path) feature_dim = AudioFeatureMixin._get_feature_dim(audio_feature_dict, @@ -107,16 +106,17 @@ def _get_feature_dim(audio_feature_dict, sampling_rate_in_hz): return feature_dim @staticmethod - def _read_audio_and_transform_to_feature(filepath, audio_feature_dict, - feature_dim, max_length, - padding_value, normalization_type, - audio_stats): - """ - :param filepath: path to the audio - :param audio_feature_dict: dictionary describing audio feature see default - :param feature_dim: dimension of each feature frame - :param max_length: max audio length defined by user in samples - """ + def _process_in_memory( + column, + src_path, + audio_feature_dict, + feature_dim, + max_length, + padding_value, + normalization_type, + audio_file_length_limit_in_s, + backend + ): try: import soundfile except ImportError: @@ -127,10 +127,59 @@ def _read_audio_and_transform_to_feature(filepath, audio_feature_dict, ) sys.exit(-1) - feature_type = audio_feature_dict[TYPE] - audio, sampling_rate_in_hz = soundfile.read(filepath) - AudioFeatureMixin._update(audio_stats, audio, sampling_rate_in_hz) + def read_audio(path): + filepath = get_abs_path(src_path, path) + return soundfile.read(filepath) + + df_engine = backend.df_engine + raw_audio = df_engine.map_objects(column, read_audio) + processed_audio = df_engine.map_objects( + raw_audio, + lambda row: AudioFeatureMixin._transform_to_feature( + audio=row[0], + sampling_rate_in_hz=row[1], + audio_feature_dict=audio_feature_dict, + feature_dim=feature_dim, + max_length=max_length, + padding_value=padding_value, + normalization_type=normalization_type + ) + ) + audio_stats = df_engine.map_objects( + raw_audio, + lambda row: AudioFeatureMixin._get_stats( + audio=row[0], + sampling_rate_in_hz=row[1], + max_length_in_s=audio_file_length_limit_in_s, + ) + ) + + def reduce(series): + merged_stats = None + for audio_stats in series: + if merged_stats is None: + merged_stats = audio_stats.copy() + else: + AudioFeatureMixin._merge_stats(merged_stats, audio_stats) + return merged_stats + + merged_stats = df_engine.reduce_objects(audio_stats, reduce) + merged_stats['mean'] = calculate_mean(merged_stats['sum'], merged_stats['count']) + merged_stats['var'] = calculate_var(merged_stats['sum'], merged_stats['sum2'], merged_stats['count']) + return processed_audio, merged_stats + + @staticmethod + def _transform_to_feature( + audio, + sampling_rate_in_hz, + audio_feature_dict, + feature_dim, + max_length, + padding_value, + normalization_type + ): + feature_type = audio_feature_dict[TYPE] if feature_type == 'raw': audio_feature = np.expand_dims(audio, axis=-1) elif feature_type in ['stft', 'stft_phase', 'group_delay', 'fbank']: @@ -160,21 +209,25 @@ def _read_audio_and_transform_to_feature(filepath, audio_feature_dict, return audio_feature_padded @staticmethod - def _update(audio_stats, audio, sampling_rate_in_hz): + def _get_stats(audio, sampling_rate_in_hz, max_length_in_s): audio_length_in_s = audio.shape[-1] / float(sampling_rate_in_hz) - audio_stats['count'] += 1 - mean = ((audio_stats['count'] - 1) * audio_stats[ - 'mean'] + audio_length_in_s) / float(audio_stats['count']) - mean = calculate_incr_mean(audio_stats['count'], audio_stats['mean'], - audio_length_in_s) - var = calculate_incr_var(audio_stats['var'], audio_stats['mean'], mean, - audio_length_in_s) - audio_stats['mean'] = mean - audio_stats['var'] = var - audio_stats['max'] = max(audio_stats['max'], audio_length_in_s) - audio_stats['min'] = min(audio_stats['min'], audio_length_in_s) - if audio_length_in_s > audio_stats['max_length_in_s']: - audio_stats['cropped'] += 1 + return { + 'count': 1, + 'sum': audio_length_in_s, + 'sum2': audio_length_in_s * audio_length_in_s, + 'min': audio_length_in_s, + 'max': audio_length_in_s, + 'cropped': 1 if audio_length_in_s > max_length_in_s else 0 + } + + @staticmethod + def _merge_stats(merged_stats, audio_stats): + merged_stats['count'] += audio_stats['count'] + merged_stats['sum'] += audio_stats['sum'] + merged_stats['sum2'] += audio_stats['sum2'] + merged_stats['min'] = min(merged_stats['min'], audio_stats['min']) + merged_stats['max'] = max(merged_stats['max'], audio_stats['max']) + merged_stats['cropped'] += audio_stats['cropped'] @staticmethod def _get_2D_feature(audio, feature_type, audio_feature_dict, @@ -222,10 +275,11 @@ def _get_2D_feature(audio, feature_type, audio_feature_dict, @staticmethod def add_feature_data( feature, - dataset_df, - dataset, + input_df, + proc_df, metadata, - preprocessing_parameters + preprocessing_parameters, + backend ): set_default_value( feature['preprocessing'], @@ -233,11 +287,11 @@ def add_feature_data( preprocessing_parameters['in_memory'] ) - if not 'audio_feature' in preprocessing_parameters: + if 'audio_feature' not in preprocessing_parameters: raise ValueError( 'audio_feature dictionary has to be present in preprocessing ' 'for audio.') - if not TYPE in preprocessing_parameters['audio_feature']: + if TYPE not in preprocessing_parameters['audio_feature']: raise ValueError( 'type has to be present in audio_feature dictionary ' 'for audio.') @@ -249,14 +303,14 @@ def add_feature_data( src_path = None # this is not super nice, but works both and DFs and lists first_path = '.' - for first_path in dataset_df[column]: + for first_path in input_df[column]: break - if hasattr(dataset_df, 'src'): - src_path = os.path.dirname(os.path.abspath(dataset_df.src)) + if hasattr(input_df, 'src'): + src_path = os.path.dirname(os.path.abspath(input_df.src)) if src_path is None and not os.path.isabs(first_path): raise ValueError('Audio file paths must be absolute') - num_audio_utterances = len(dataset_df) + num_audio_utterances = len(input_df) padding_value = preprocessing_parameters['padding_value'] normalization_type = preprocessing_parameters['norm'] @@ -269,33 +323,20 @@ def add_feature_data( if num_audio_utterances == 0: raise ValueError( 'There are no audio files in the dataset provided.') - audio_stats = { - 'count': 0, - 'mean': 0, - 'var': 0, - 'std': 0, - 'max': 0, - 'min': float('inf'), - 'cropped': 0, - 'max_length_in_s': audio_file_length_limit_in_s - } if feature[PREPROCESSING]['in_memory']: - dataset[proc_column] = np.empty( - (num_audio_utterances, max_length, feature_dim), - dtype=np.float32 + audio_features, audio_stats = AudioFeatureMixin._process_in_memory( + input_df[feature[NAME]], + src_path, + audio_feature_dict, + feature_dim, + max_length, + padding_value, + normalization_type, + audio_file_length_limit_in_s, + backend ) - for i, path in enumerate(dataset_df[column]): - filepath = get_abs_path( - src_path, - path - ) - audio_feature = AudioFeatureMixin._read_audio_and_transform_to_feature( - filepath, audio_feature_dict, feature_dim, max_length, - padding_value, normalization_type, audio_stats - ) - - dataset[proc_column][i, :, :] = audio_feature + proc_df[proc_column] = audio_features audio_stats['std'] = np.sqrt( audio_stats['var'] / float(audio_stats['count'])) @@ -312,8 +353,12 @@ def add_feature_data( audio_stats['count'], audio_stats['mean'], audio_stats['std'], audio_stats['max'], audio_stats['min'], audio_stats['cropped'], - audio_stats['max_length_in_s']) + audio_file_length_limit_in_s) logger.debug(print_statistics) + else: + backend.check_lazy_load_supported(feature) + + return proc_df @staticmethod def _get_max_length_feature( @@ -374,7 +419,8 @@ def call(self, inputs, training=None, mask=None): return encoder_output - def get_input_dtype(self): + @classmethod + def get_input_dtype(cls): return tf.float32 def get_input_shape(self): diff --git a/ludwig/features/bag_feature.py b/ludwig/features/bag_feature.py index 6c262c1321b..6b4a8b1c618 100644 --- a/ludwig/features/bag_feature.py +++ b/ludwig/features/bag_feature.py @@ -42,12 +42,13 @@ class BagFeatureMixin(object): } @staticmethod - def get_feature_meta(column, preprocessing_parameters): + def get_feature_meta(column, preprocessing_parameters, backend): idx2str, str2idx, str2freq, max_size, _, _, _ = create_vocabulary( column, preprocessing_parameters['tokenizer'], num_most_frequent=preprocessing_parameters['most_common'], - lowercase=preprocessing_parameters['lowercase'] + lowercase=preprocessing_parameters['lowercase'], + processor=backend.df_engine, ) return { 'idx2str': idx2str, @@ -58,37 +59,36 @@ def get_feature_meta(column, preprocessing_parameters): } @staticmethod - def feature_data(column, metadata, preprocessing_parameters): - bag_matrix = np.zeros( - (len(column), - len(metadata['str2idx'])), - dtype=np.float32 - ) - - for i, set_str in enumerate(column): + def feature_data(column, metadata, preprocessing_parameters, backend): + def to_vector(set_str): + bag_vector = np.zeros((len(metadata['str2idx']),), dtype=np.float32) col_counter = Counter(set_str_to_idx( set_str, metadata['str2idx'], preprocessing_parameters['tokenizer']) ) - bag_matrix[i, list(col_counter.keys())] = list( - col_counter.values()) - return bag_matrix + bag_vector[list(col_counter.keys())] = list(col_counter.values()) + return bag_vector + + return backend.df_engine.map_objects(column, to_vector) @staticmethod def add_feature_data( feature, - dataset_df, - dataset, + input_df, + proc_df, metadata, - preprocessing_parameters=None + preprocessing_parameters, + backend ): - dataset[feature[PROC_COLUMN]] = BagFeatureMixin.feature_data( - dataset_df[feature[COLUMN]].astype(str), + proc_df[feature[PROC_COLUMN]] = BagFeatureMixin.feature_data( + input_df[feature[COLUMN]].astype(str), metadata[feature[NAME]], - preprocessing_parameters + preprocessing_parameters, + backend ) + return proc_df class BagInputFeature(BagFeatureMixin, InputFeature): @@ -111,7 +111,8 @@ def call(self, inputs, training=None, mask=None): return {'encoder_output': encoder_output} - def get_input_dtype(self): + @classmethod + def get_input_dtype(cls): return tf.float32 def get_input_shape(self): diff --git a/ludwig/features/base_feature.py b/ludwig/features/base_feature.py index 18f7d9370a1..5a07c3398c9 100644 --- a/ludwig/features/base_feature.py +++ b/ludwig/features/base_feature.py @@ -80,8 +80,9 @@ def create_input(self): dtype=self.get_input_dtype(), name=self.name + '_input') + @classmethod @abstractmethod - def get_input_dtype(self): + def get_input_dtype(cls): """Returns the Tensor data type this input accepts.""" pass @@ -183,8 +184,9 @@ def create_input(self): dtype=self.get_output_dtype(), name=self.name + '_input') + @classmethod @abstractmethod - def get_output_dtype(self): + def get_output_dtype(cls): """Returns the Tensor data type feature outputs.""" pass diff --git a/ludwig/features/binary_feature.py b/ludwig/features/binary_feature.py index f4872404631..212db143103 100644 --- a/ludwig/features/binary_feature.py +++ b/ludwig/features/binary_feature.py @@ -50,21 +50,23 @@ class BinaryFeatureMixin(object): } @staticmethod - def get_feature_meta(column, preprocessing_parameters): + def get_feature_meta(column, preprocessing_parameters, backend): return {} @staticmethod def add_feature_data( feature, - dataset_df, - dataset, + input_df, + proc_df, metadata, - preprocessing_parameters=None + preprocessing_parameters, + backend ): - column = dataset_df[feature[COLUMN]] + column = input_df[feature[COLUMN]] if column.dtype == object: column = column.map(str2bool) - dataset[feature[PROC_COLUMN]] = column.astype(np.bool_).values + proc_df[feature[PROC_COLUMN]] = column.astype(np.bool_).values + return proc_df class BinaryInputFeature(BinaryFeatureMixin, InputFeature): @@ -93,7 +95,8 @@ def call(self, inputs, training=None, mask=None): return encoder_outputs - def get_input_dtype(self): + @classmethod + def get_input_dtype(cls): return tf.bool def get_input_shape(self): @@ -195,7 +198,8 @@ def _setup_metrics(self): # else: # metric_fn.update_state(targets, predictions[PREDICTIONS]) - def get_output_dtype(self): + @classmethod + def get_output_dtype(cls): return tf.bool def get_output_shape(self): diff --git a/ludwig/features/category_feature.py b/ludwig/features/category_feature.py index c086bf374c9..902edd9423d 100644 --- a/ludwig/features/category_feature.py +++ b/ludwig/features/category_feature.py @@ -54,12 +54,13 @@ class CategoryFeatureMixin(object): } @staticmethod - def get_feature_meta(column, preprocessing_parameters): + def get_feature_meta(column, preprocessing_parameters, backend): idx2str, str2idx, str2freq, _, _, _, _ = create_vocabulary( column, 'stripped', num_most_frequent=preprocessing_parameters['most_common'], lowercase=preprocessing_parameters['lowercase'], - add_padding=False + add_padding=False, + processor=backend.df_engine ) return { 'idx2str': idx2str, @@ -70,29 +71,28 @@ def get_feature_meta(column, preprocessing_parameters): @staticmethod def feature_data(column, metadata): - return np.array( - column.map( - lambda x: ( - metadata['str2idx'][x.strip()] - if x.strip() in metadata['str2idx'] - else metadata['str2idx'][UNKNOWN_SYMBOL] - ) - ), - dtype=int_type(metadata['vocab_size']) - ) + return column.map( + lambda x: ( + metadata['str2idx'][x.strip()] + if x.strip() in metadata['str2idx'] + else metadata['str2idx'][UNKNOWN_SYMBOL] + ) + ).astype(int_type(metadata['vocab_size'])) @staticmethod def add_feature_data( feature, - dataset_df, - dataset, + input_df, + proc_df, metadata, - preprocessing_parameters=None + preprocessing_parameters, + backend ): - dataset[feature[PROC_COLUMN]] = CategoryFeatureMixin.feature_data( - dataset_df[feature[COLUMN]].astype(str), - metadata[feature[NAME]] + proc_df[feature[PROC_COLUMN]] = CategoryFeatureMixin.feature_data( + input_df[feature[COLUMN]].astype(str), + metadata[feature[NAME]], ) + return proc_df class CategoryInputFeature(CategoryFeatureMixin, InputFeature): @@ -120,7 +120,8 @@ def call(self, inputs, training=None, mask=None): return {'encoder_output': encoder_output} - def get_input_dtype(self): + @classmethod + def get_input_dtype(cls): return tf.int32 def get_input_shape(self): @@ -198,7 +199,8 @@ def predictions( LOGITS: logits } - def get_output_dtype(self): + @classmethod + def get_output_dtype(cls): return tf.int64 def get_output_shape(self): diff --git a/ludwig/features/date_feature.py b/ludwig/features/date_feature.py index 92f347af03d..232087dc8f9 100644 --- a/ludwig/features/date_feature.py +++ b/ludwig/features/date_feature.py @@ -41,7 +41,7 @@ class DateFeatureMixin(object): } @staticmethod - def get_feature_meta(column, preprocessing_parameters): + def get_feature_meta(column, preprocessing_parameters, backend): return { 'preprocessing': preprocessing_parameters } @@ -95,20 +95,20 @@ def date_to_list(date_str, datetime_format, preprocessing_parameters): @staticmethod def add_feature_data( feature, - dataset_df, - dataset, + input_df, + proc_df, metadata, - preprocessing_parameters=None + preprocessing_parameters, + backend ): datetime_format = preprocessing_parameters['datetime_format'] - dates_to_lists = [ - np.array(DateFeatureMixin.date_to_list( - row, datetime_format, preprocessing_parameters - )) - for row in dataset_df[feature[COLUMN]] - ] - dataset[feature[PROC_COLUMN]] = np.array(dates_to_lists, - dtype=np.int16) + proc_df[feature[PROC_COLUMN]] = backend.df_engine.map_objects( + input_df[feature[COLUMN]], + lambda x: np.array(DateFeatureMixin.date_to_list( + x, datetime_format, preprocessing_parameters + ), dtype=np.int16) + ) + return proc_df class DateInputFeature(DateFeatureMixin, InputFeature): @@ -132,7 +132,8 @@ def call(self, inputs, training=None, mask=None): return inputs_encoded - def get_input_dtype(self): + @classmethod + def get_input_dtype(cls): return tf.int16 def get_input_shape(self): diff --git a/ludwig/features/h3_feature.py b/ludwig/features/h3_feature.py index ab348ad082a..9ec041c06bd 100644 --- a/ludwig/features/h3_feature.py +++ b/ludwig/features/h3_feature.py @@ -41,7 +41,7 @@ class H3FeatureMixin(object): } @staticmethod - def get_feature_meta(column, preprocessing_parameters): + def get_feature_meta(column, preprocessing_parameters, backend): return {} @staticmethod @@ -61,17 +61,22 @@ def h3_to_list(h3_int): @staticmethod def add_feature_data( feature, - dataset_df, - dataset, + input_df, + proc_df, metadata, - preprocessing_parameters + preprocessing_parameters, + backend ): - column = dataset_df[feature[COLUMN]] + column = input_df[feature[COLUMN]] if column.dtype == object: column = column.map(int) column = column.map(H3FeatureMixin.h3_to_list) - dataset[feature[PROC_COLUMN]] = np.array(column.tolist(), - dtype=np.uint8) + + proc_df[feature[PROC_COLUMN]] = backend.df_engine.map_objects( + column, + lambda x: np.array(x, dtype=np.uint8) + ) + return proc_df class H3InputFeature(H3FeatureMixin, InputFeature): @@ -96,7 +101,8 @@ def call(self, inputs, training=None, mask=None): return inputs_encoded - def get_input_dtype(self): + @classmethod + def get_input_dtype(cls): return tf.uint8 def get_input_shape(self): diff --git a/ludwig/features/image_feature.py b/ludwig/features/image_feature.py index 79bf581900e..a7bc085cd65 100644 --- a/ludwig/features/image_feature.py +++ b/ludwig/features/image_feature.py @@ -47,7 +47,7 @@ class ImageFeatureMixin(object): } @staticmethod - def get_feature_meta(column, preprocessing_parameters): + def get_feature_meta(column, preprocessing_parameters, backend): return { PREPROCESSING: preprocessing_parameters } @@ -223,10 +223,11 @@ def _finalize_preprocessing_parameters( @staticmethod def add_feature_data( feature, - dataset_df, - dataset, + input_df, + proc_df, metadata, - preprocessing_parameters + preprocessing_parameters, + backend ): set_default_value( feature[PREPROCESSING], @@ -239,14 +240,14 @@ def add_feature_data( preprocessing_parameters['num_processes'] ) src_path = None - if hasattr(dataset_df, 'src'): - src_path = os.path.dirname(os.path.abspath(dataset_df.src)) + if hasattr(input_df, 'src'): + src_path = os.path.dirname(os.path.abspath(input_df.src)) - num_images = len(dataset_df) + num_images = len(input_df) if num_images == 0: raise ValueError('There are no images in the dataset provided.') - first_path = next(iter(dataset_df[feature[COLUMN]])) + first_path = next(iter(input_df[feature[COLUMN]])) if src_path is None and not os.path.isabs(first_path): raise ValueError('Image file paths must be absolute') @@ -278,8 +279,6 @@ def add_feature_data( resize_method=preprocessing_parameters['resize_method'], user_specified_num_channels=user_specified_num_channels ) - all_file_paths = [get_abs_path(src_path, file_path) - for file_path in dataset_df[feature[COLUMN]]] if feature[PREPROCESSING]['in_memory']: # Number of processes to run in parallel for preprocessing @@ -287,34 +286,38 @@ def add_feature_data( metadata[feature[NAME]][PREPROCESSING][ 'num_processes'] = num_processes - dataset[feature[PROC_COLUMN]] = np.empty( - (num_images, height, width, num_channels), - dtype=np.uint8 - ) # Split the dataset into pools only if we have an explicit request to use # multiple processes. In case we have multiple input images use the # standard code anyway. - if num_processes > 1 or num_images > 1: + if backend.supports_multiprocessing and (num_processes > 1 or num_images > 1): + all_file_paths = [get_abs_path(src_path, file_path) + for file_path in input_df[feature[NAME]]] + with Pool(num_processes) as pool: logger.debug( 'Using {} processes for preprocessing images'.format( num_processes ) ) - dataset[feature[PROC_COLUMN]] = np.array( - pool.map(read_image_and_resize, all_file_paths) - ) - + proc_df[feature[PROC_COLUMN]] = pool.map(read_image_and_resize, all_file_paths) else: # If we're not running multiple processes and we are only processing one # image just use this faster shortcut, bypassing multiprocessing.Pool.map logger.debug( - 'No process pool initialized. Using one process for preprocessing images' + 'No process pool initialized. Using internal process for preprocessing images' + ) + + proc_df[feature[PROC_COLUMN]] = backend.df_engine.map_objects( + input_df[feature[COLUMN]], + lambda file_path: read_image_and_resize(get_abs_path(src_path, file_path)) ) - img = read_image_and_resize(all_file_paths[0]) - dataset[feature[PROC_COLUMN]] = np.array([img]) else: - data_fp = os.path.splitext(dataset_df.src)[0] + '.hdf5' + backend.check_lazy_load_supported(feature) + + all_file_paths = [get_abs_path(src_path, file_path) + for file_path in input_df[feature[NAME]]] + + data_fp = os.path.splitext(input_df.src)[0] + '.hdf5' mode = 'w' if os.path.isfile(data_fp): mode = 'r+' @@ -332,7 +335,8 @@ def add_feature_data( ) h5_file.flush() - dataset[feature[PROC_COLUMN]] = np.arange(num_images) + proc_df[feature[PROC_COLUMN]] = np.arange(num_images) + return proc_df class ImageInputFeature(ImageFeatureMixin, InputFeature): @@ -363,7 +367,8 @@ def call(self, inputs, training=None, mask=None): return inputs_encoded - def get_input_dtype(self): + @classmethod + def get_input_dtype(cls): return tf.uint8 def get_input_shape(self): diff --git a/ludwig/features/numerical_feature.py b/ludwig/features/numerical_feature.py index 64d9aa21db8..2cf4ca9b551 100644 --- a/ludwig/features/numerical_feature.py +++ b/ludwig/features/numerical_feature.py @@ -48,17 +48,18 @@ class NumericalFeatureMixin(object): } @staticmethod - def get_feature_meta(column, preprocessing_parameters): + def get_feature_meta(column, preprocessing_parameters, backend): + compute = backend.df_engine.compute if preprocessing_parameters['normalization'] is not None: if preprocessing_parameters['normalization'] == 'zscore': return { - 'mean': column.astype(np.float32).mean(), - 'std': column.astype(np.float32).std() + 'mean': compute(column.astype(np.float32).mean()), + 'std': compute(column.astype(np.float32).std()) } elif preprocessing_parameters['normalization'] == 'minmax': return { - 'min': column.astype(np.float32).min(), - 'max': column.astype(np.float32).max() + 'min': compute(column.astype(np.float32).min()), + 'max': compute(column.astype(np.float32).max()) } else: logger.info( @@ -73,25 +74,27 @@ def get_feature_meta(column, preprocessing_parameters): @staticmethod def add_feature_data( feature, - dataset_df, - dataset, + input_df, + proc_df, metadata, preprocessing_parameters, + backend ): - dataset[feature[PROC_COLUMN]] = dataset_df[feature[COLUMN]].astype( + proc_df[feature[PROC_COLUMN]] = input_df[feature[COLUMN]].astype( np.float32).values if preprocessing_parameters['normalization'] is not None: if preprocessing_parameters['normalization'] == 'zscore': mean = metadata[feature[NAME]]['mean'] std = metadata[feature[NAME]]['std'] - dataset[feature[PROC_COLUMN]] = (dataset[ + proc_df[feature[PROC_COLUMN]] = (proc_df[ feature[ PROC_COLUMN]] - mean) / std elif preprocessing_parameters['normalization'] == 'minmax': min_ = metadata[feature[NAME]]['min'] max_ = metadata[feature[NAME]]['max'] - values = dataset[feature[PROC_COLUMN]] - dataset[feature[PROC_COLUMN]] = (values - min_) / (max_ - min_) + values = proc_df[feature[PROC_COLUMN]] + proc_df[feature[PROC_COLUMN]] = (values - min_) / (max_ - min_) + return proc_df class NumericalInputFeature(NumericalFeatureMixin, InputFeature): @@ -117,7 +120,8 @@ def call(self, inputs, training=None, mask=None): return inputs_encoded - def get_input_dtype(self): + @classmethod + def get_input_dtype(cls): return tf.float32 def get_input_shape(self): @@ -226,7 +230,8 @@ def _setup_metrics(self): # for metric in self.metric_functions.values(): # metric.update_state(targets, predictions[PREDICTIONS]) - def get_output_dtype(self): + @classmethod + def get_output_dtype(cls): return tf.float32 def get_output_shape(self): diff --git a/ludwig/features/sequence_feature.py b/ludwig/features/sequence_feature.py index 3d697815bbb..f1a5071730b 100644 --- a/ludwig/features/sequence_feature.py +++ b/ludwig/features/sequence_feature.py @@ -68,7 +68,7 @@ class SequenceFeatureMixin(object): } @staticmethod - def get_feature_meta(column, preprocessing_parameters): + def get_feature_meta(column, preprocessing_parameters, backend): idx2str, str2idx, str2freq, max_length, _, _, _ = create_vocabulary( column, preprocessing_parameters['tokenizer'], lowercase=preprocessing_parameters['lowercase'], @@ -76,7 +76,7 @@ def get_feature_meta(column, preprocessing_parameters): vocab_file=preprocessing_parameters['vocab_file'], unknown_symbol=preprocessing_parameters['unknown_symbol'], padding_symbol=preprocessing_parameters['padding_symbol'], - + processor=backend.df_engine ) max_length = min( preprocessing_parameters['sequence_length_limit'], @@ -91,7 +91,7 @@ def get_feature_meta(column, preprocessing_parameters): } @staticmethod - def feature_data(column, metadata, preprocessing_parameters): + def feature_data(column, metadata, preprocessing_parameters, backend): sequence_data = build_sequence_matrix( sequences=column, inverse_vocabulary=metadata['str2idx'], @@ -103,22 +103,27 @@ def feature_data(column, metadata, preprocessing_parameters): lowercase=preprocessing_parameters['lowercase'], tokenizer_vocab_file=preprocessing_parameters[ 'vocab_file' - ] + ], + processor=backend.df_engine ) return sequence_data @staticmethod def add_feature_data( feature, - dataset_df, - dataset, + input_df, + proc_df, metadata, - preprocessing_parameters + preprocessing_parameters, + backend ): sequence_data = SequenceInputFeature.feature_data( - dataset_df[feature[COLUMN]].astype(str), - metadata[feature[NAME]], preprocessing_parameters) - dataset[feature[PROC_COLUMN]] = sequence_data + input_df[feature[COLUMN]].astype(str), + metadata[feature[NAME]], preprocessing_parameters, + backend + ) + proc_df[feature[PROC_COLUMN]] = sequence_data + return proc_df class SequenceInputFeature(SequenceFeatureMixin, InputFeature): @@ -148,7 +153,8 @@ def call(self, inputs, training=None, mask=None): encoder_output[LENGTHS] = lengths return encoder_output - def get_input_dtype(self): + @classmethod + def get_input_dtype(cls): return tf.int32 def get_input_shape(self): @@ -260,7 +266,8 @@ def predictions(self, inputs, training=None): # Generator Decoder return self.decoder_obj._predictions_eval(inputs, training=training) - def get_output_dtype(self): + @classmethod + def get_output_dtype(cls): return tf.int32 def get_output_shape(self): diff --git a/ludwig/features/set_feature.py b/ludwig/features/set_feature.py index 7108f900f28..44e1dd3db3b 100644 --- a/ludwig/features/set_feature.py +++ b/ludwig/features/set_feature.py @@ -47,12 +47,13 @@ class SetFeatureMixin(object): } @staticmethod - def get_feature_meta(column, preprocessing_parameters): + def get_feature_meta(column, preprocessing_parameters, backend): idx2str, str2idx, str2freq, max_size, _, _, _ = create_vocabulary( column, preprocessing_parameters['tokenizer'], num_most_frequent=preprocessing_parameters['most_common'], - lowercase=preprocessing_parameters['lowercase'] + lowercase=preprocessing_parameters['lowercase'], + processor=backend.df_engine ) return { 'idx2str': idx2str, @@ -63,40 +64,36 @@ def get_feature_meta(column, preprocessing_parameters): } @staticmethod - def feature_data(column, metadata, preprocessing_parameters): - feature_vector = np.array( - column.map( - lambda x: set_str_to_idx( - x, - metadata['str2idx'], - preprocessing_parameters['tokenizer'] - ) + def feature_data(column, metadata, preprocessing_parameters, backend): + def to_dense(x): + feature_vector = set_str_to_idx( + x, + metadata['str2idx'], + preprocessing_parameters['tokenizer'] ) - ) - set_matrix = np.zeros( - (len(column), - len(metadata['str2idx'])), - ) + set_vector = np.zeros((len(metadata['str2idx']),)) + set_vector[feature_vector] = 1 + return set_vector.astype(np.bool) - for i in range(len(column)): - set_matrix[i, feature_vector[i]] = 1 - - return set_matrix.astype(np.bool) + return backend.df_engine.map_objects(column, to_dense) @staticmethod def add_feature_data( feature, - dataset_df, - dataset, + input_df, + proc_df, metadata, preprocessing_parameters, + backend ): - dataset[feature[PROC_COLUMN]] = SetFeatureMixin.feature_data( - dataset_df[feature[COLUMN]].astype(str), + proc_df[feature[PROC_COLUMN]] = SetFeatureMixin.feature_data( + input_df[feature[COLUMN]].astype(str), metadata[feature[NAME]], - preprocessing_parameters + preprocessing_parameters, + backend ) + return proc_df class SetInputFeature(SetFeatureMixin, InputFeature): @@ -121,7 +118,8 @@ def call(self, inputs, training=None, mask=None): return {'encoder_output': encoder_output} - def get_input_dtype(self): + @classmethod + def get_input_dtype(cls): return tf.bool def get_input_shape(self): @@ -213,7 +211,8 @@ def _setup_metrics(self): self.metric_functions[LOSS] = self.eval_loss_function self.metric_functions[JACCARD] = JaccardMetric() - def get_output_dtype(self): + @classmethod + def get_output_dtype(cls): return tf.bool def get_output_shape(self): diff --git a/ludwig/features/text_feature.py b/ludwig/features/text_feature.py index bce5dfd56a5..c6598cdf84d 100644 --- a/ludwig/features/text_feature.py +++ b/ludwig/features/text_feature.py @@ -58,7 +58,7 @@ class TextFeatureMixin(object): } @staticmethod - def feature_meta(column, preprocessing_parameters): + def feature_meta(column, preprocessing_parameters, backend): ( char_idx2str, char_str2idx, @@ -75,7 +75,8 @@ def feature_meta(column, preprocessing_parameters): unknown_symbol=preprocessing_parameters['unknown_symbol'], padding_symbol=preprocessing_parameters['padding_symbol'], pretrained_model_name_or_path=preprocessing_parameters[ - 'pretrained_model_name_or_path'] + 'pretrained_model_name_or_path'], + processor=backend.df_engine ) ( word_idx2str, @@ -94,7 +95,8 @@ def feature_meta(column, preprocessing_parameters): unknown_symbol=preprocessing_parameters['unknown_symbol'], padding_symbol=preprocessing_parameters['padding_symbol'], pretrained_model_name_or_path=preprocessing_parameters[ - 'pretrained_model_name_or_path'] + 'pretrained_model_name_or_path'], + processor=backend.df_engine ) return ( char_idx2str, @@ -114,9 +116,9 @@ def feature_meta(column, preprocessing_parameters): ) @staticmethod - def get_feature_meta(column, preprocessing_parameters): + def get_feature_meta(column, preprocessing_parameters, backend): tf_meta = TextFeatureMixin.feature_meta( - column, preprocessing_parameters + column, preprocessing_parameters, backend ) ( char_idx2str, @@ -162,7 +164,7 @@ def get_feature_meta(column, preprocessing_parameters): } @staticmethod - def feature_data(column, metadata, preprocessing_parameters): + def feature_data(column, metadata, preprocessing_parameters, backend): char_data = build_sequence_matrix( sequences=column, inverse_vocabulary=metadata['char_str2idx'], @@ -177,8 +179,8 @@ def feature_data(column, metadata, preprocessing_parameters): ], pretrained_model_name_or_path=preprocessing_parameters[ 'pretrained_model_name_or_path' - ] - + ], + processor=backend.df_engine ) word_data = build_sequence_matrix( sequences=column, @@ -194,7 +196,8 @@ def feature_data(column, metadata, preprocessing_parameters): ], pretrained_model_name_or_path=preprocessing_parameters[ 'pretrained_model_name_or_path' - ] + ], + processor=backend.df_engine ) return char_data, word_data @@ -202,17 +205,21 @@ def feature_data(column, metadata, preprocessing_parameters): @staticmethod def add_feature_data( feature, - dataset_df, - dataset, + input_df, + proc_df, metadata, - preprocessing_parameters + preprocessing_parameters, + backend ): chars_data, words_data = TextFeatureMixin.feature_data( - dataset_df[feature[COLUMN]].astype(str), - metadata[feature[NAME]], preprocessing_parameters + input_df[feature[COLUMN]].astype(str), + metadata[feature[NAME]], + preprocessing_parameters, + backend ) - dataset['{}_char'.format(feature[PROC_COLUMN])] = chars_data - dataset['{}_word'.format(feature[PROC_COLUMN])] = words_data + proc_df['{}_char'.format(feature[PROC_COLUMN])] = chars_data + proc_df['{}_word'.format(feature[PROC_COLUMN])] = words_data + return proc_df class TextInputFeature(TextFeatureMixin, SequenceInputFeature): @@ -246,7 +253,8 @@ def call(self, inputs, training=None, mask=None): return encoder_output - def get_input_dtype(self): + @classmethod + def get_input_dtype(cls): return tf.int32 def get_input_shape(self): @@ -328,7 +336,8 @@ class TextOutputFeature(TextFeatureMixin, SequenceOutputFeature): def __init__(self, feature): super().__init__(feature) - def get_output_dtype(self): + @classmethod + def get_output_dtype(cls): return tf.int32 def get_output_shape(self): diff --git a/ludwig/features/timeseries_feature.py b/ludwig/features/timeseries_feature.py index ee651fef661..1a27950dcba 100644 --- a/ludwig/features/timeseries_feature.py +++ b/ludwig/features/timeseries_feature.py @@ -43,7 +43,7 @@ class TimeseriesFeatureMixin(object): } @staticmethod - def get_feature_meta(column, preprocessing_parameters): + def get_feature_meta(column, preprocessing_parameters, backend): tokenizer = get_from_registry( preprocessing_parameters['tokenizer'], tokenizer_registry @@ -65,20 +65,20 @@ def build_matrix( tokenizer_name, length_limit, padding_value, - padding='right' + padding, + backend ): tokenizer = get_from_registry( tokenizer_name, tokenizer_registry )() - max_length = 0 - ts_vectors = [] - for ts in timeseries: - ts_vector = np.array(tokenizer(ts)).astype(np.float32) - ts_vectors.append(ts_vector) - if len(ts_vector) > max_length: - max_length = len(ts_vector) + ts_vectors = backend.df_engine.map_objects( + timeseries, + lambda ts: np.array(tokenizer(ts)).astype(np.float32) + ) + + max_length = backend.df_engine.compute(ts_vectors.map(len).max()) if max_length < length_limit: logger.debug( 'max length of {0}: {1} < limit: {2}'.format( @@ -88,42 +88,49 @@ def build_matrix( ) ) max_length = length_limit - timeseries_matrix = np.full( - (len(timeseries), max_length), - padding_value, - dtype=np.float32 - ) - for i, vector in enumerate(ts_vectors): + + def pad(vector): + padded = np.full( + (max_length,), + padding_value, + dtype=np.float32 + ) limit = min(vector.shape[0], max_length) if padding == 'right': - timeseries_matrix[i, :limit] = vector[:limit] + padded[:limit] = vector[:limit] else: # if padding == 'left - timeseries_matrix[i, max_length - limit:] = vector[:limit] - return timeseries_matrix + padded[max_length - limit:] = vector[:limit] + return padded + + return backend.df_engine.map_objects(ts_vectors, pad) @staticmethod - def feature_data(column, metadata, preprocessing_parameters): + def feature_data(column, metadata, preprocessing_parameters, backend): timeseries_data = TimeseriesFeatureMixin.build_matrix( column, preprocessing_parameters['tokenizer'], metadata['max_timeseries_length'], preprocessing_parameters['padding_value'], - preprocessing_parameters['padding']) + preprocessing_parameters['padding'], + backend) return timeseries_data @staticmethod def add_feature_data( feature, - dataset_df, - dataset, + input_df, + proc_df, metadata, - preprocessing_parameters + preprocessing_parameters, + backend ): - dataset[feature[PROC_COLUMN]] = TimeseriesFeatureMixin.feature_data( - dataset_df[feature[COLUMN]].astype(str), + proc_df[feature[PROC_COLUMN]] = TimeseriesFeatureMixin.feature_data( + input_df[feature[COLUMN]].astype(str), metadata[feature[NAME]], - preprocessing_parameters + preprocessing_parameters, + backend ) + return proc_df class TimeseriesInputFeature(TimeseriesFeatureMixin, SequenceInputFeature): @@ -146,7 +153,8 @@ def call(self, inputs, training=None, mask=None): return encoder_output - def get_input_dtype(self): + @classmethod + def get_input_dtype(cls): return tf.float32 def get_input_shape(self): @@ -383,7 +391,8 @@ def populate_defaults(input_feature): # }) # ]) # -# def get_output_dtype(self): +# @classmethod +# def get_output_dtype(cls): # return tf.float32 # # def get_output_shape(self): diff --git a/ludwig/features/vector_feature.py b/ludwig/features/vector_feature.py index 115cfc19f20..ef9a46ae954 100644 --- a/ludwig/features/vector_feature.py +++ b/ludwig/features/vector_feature.py @@ -48,7 +48,7 @@ class VectorFeatureMixin(object): } @staticmethod - def get_feature_meta(column, preprocessing_parameters): + def get_feature_meta(column, preprocessing_parameters, backend): return { 'preprocessing': preprocessing_parameters } @@ -56,23 +56,24 @@ def get_feature_meta(column, preprocessing_parameters): @staticmethod def add_feature_data( feature, - dataset_df, - dataset, + input_df, + proc_df, metadata, preprocessing_parameters, + backend ): """ Expects all the vectors to be of the same size. The vectors need to be whitespace delimited strings. Missing values are not handled. """ - if len(dataset_df) == 0: + if len(input_df) == 0: raise ValueError("There are no vectors in the dataset provided") # Convert the string of features into a numpy array try: - dataset[feature[PROC_COLUMN]] = np.array( - [x.split() for x in dataset_df[feature[COLUMN]]], - dtype=np.float32 + proc_df[feature[PROC_COLUMN]] = backend.df_engine.map_objects( + input_df[feature[COLUMN]], + lambda x: np.array(x.split(), dtype=np.float32) ) except ValueError: logger.error( @@ -82,7 +83,7 @@ def add_feature_data( raise # Determine vector size - vector_size = len(dataset[feature[PROC_COLUMN]][0]) + vector_size = backend.df_engine.compute(proc_df[feature[PROC_COLUMN]].map(len).max()) if 'vector_size' in preprocessing_parameters: if vector_size != preprocessing_parameters['vector_size']: raise ValueError( @@ -95,6 +96,7 @@ def add_feature_data( logger.debug('Observed vector size: {}'.format(vector_size)) metadata[feature[NAME]]['vector_size'] = vector_size + return proc_df class VectorInputFeature(VectorFeatureMixin, InputFeature): @@ -120,7 +122,8 @@ def call(self, inputs, training=None, mask=None): return inputs_encoded - def get_input_dtype(self): + @classmethod + def get_input_dtype(cls): return tf.float32 def get_input_shape(self): @@ -216,7 +219,8 @@ def _setup_metrics(self): ) self.metric_functions[R2] = R2Score(name='metric_r2') - def get_output_dtype(self): + @classmethod + def get_output_dtype(cls): return tf.float32 def get_output_shape(self): diff --git a/ludwig/models/predictor.py b/ludwig/models/predictor.py index 4f7b9cd1c8e..f08febe3bae 100644 --- a/ludwig/models/predictor.py +++ b/ludwig/models/predictor.py @@ -9,7 +9,6 @@ from ludwig.constants import COMBINED, LOGITS from ludwig.globals import is_progressbar_disabled -from ludwig.utils.batcher import initialize_batcher from ludwig.utils.data_utils import save_csv, save_json from ludwig.utils.horovod_utils import is_on_master from ludwig.utils.misc_utils import sum_dicts @@ -43,8 +42,8 @@ def batch_predict( dataset, dataset_name=None ): - batcher = initialize_batcher( - dataset, self._batch_size, + batcher = dataset.initialize_batcher( + self._batch_size, should_shuffle=False, horovod=self._horovod ) @@ -102,8 +101,8 @@ def batch_evaluation( collect_predictions=False, dataset_name=None ): - batcher = initialize_batcher( - dataset, self._batch_size, + batcher = dataset.initialize_batcher( + self._batch_size, should_shuffle=False, horovod=self._horovod ) @@ -189,8 +188,7 @@ def batch_collect_activations( activation_model = tf.keras.Model(inputs=keras_model_inputs, outputs=output_nodes) - batcher = initialize_batcher( - dataset, + batcher = dataset.initialize_batcher( self._batch_size, should_shuffle=False ) diff --git a/ludwig/models/trainer.py b/ludwig/models/trainer.py index d659176d966..53f131868e1 100644 --- a/ludwig/models/trainer.py +++ b/ludwig/models/trainer.py @@ -42,7 +42,6 @@ from ludwig.modules.metric_modules import get_initial_validation_value from ludwig.modules.optimization_modules import ClippedOptimizer from ludwig.utils import time_utils -from ludwig.utils.batcher import initialize_batcher from ludwig.utils.data_utils import load_json, save_json from ludwig.utils.defaults import default_random_seed from ludwig.utils.horovod_utils import is_on_master @@ -458,8 +457,7 @@ def train( ) set_random_seed(self.random_seed) - batcher = initialize_batcher( - training_set, + batcher = training_set.initialize_batcher( batch_size=self.batch_size, seed=self.random_seed, horovod=self.horovod @@ -618,7 +616,7 @@ def train( step=progress_tracker.epoch, ) - if validation_set is not None and validation_set.size > 0: + if validation_set is not None and len(validation_set) > 0: # eval metrics on validation set self.evaluation( model, @@ -635,7 +633,7 @@ def train( step=progress_tracker.epoch, ) - if test_set is not None and test_set.size > 0: + if test_set is not None and len(test_set) > 0: # eval metrics on test set self.evaluation( model, @@ -734,8 +732,7 @@ def train_online( model, dataset, ): - batcher = initialize_batcher( - dataset, + batcher = dataset.initialize_batcher( batch_size=self.batch_size, horovod=self.horovod ) diff --git a/ludwig/utils/audio_utils.py b/ludwig/utils/audio_utils.py index 92564ffd691..19e8e0dff80 100644 --- a/ludwig/utils/audio_utils.py +++ b/ludwig/utils/audio_utils.py @@ -234,3 +234,11 @@ def calculate_incr_var(var_prev, mean_prev, mean, length): def calculate_incr_mean(count, mean, length): return mean + (length - mean) / float(count) + + +def calculate_var(sum1, sum2, count): + return (sum2 - ((sum1 * sum1) / float(count))) / float(count - 1) + + +def calculate_mean(sum1, count): + return sum1 / float(count) diff --git a/ludwig/utils/batcher.py b/ludwig/utils/batcher.py index 20b7e2b4b6e..8842c87eb1d 100644 --- a/ludwig/utils/batcher.py +++ b/ludwig/utils/batcher.py @@ -227,4 +227,4 @@ def initialize_batcher(dataset, batch_size=128, sampler, batch_size=batch_size, ignore_last=ignore_last) - return batcher + return batcher \ No newline at end of file diff --git a/ludwig/utils/data_utils.py b/ludwig/utils/data_utils.py index 867dd36b665..7cba4270d06 100644 --- a/ludwig/utils/data_utils.py +++ b/ludwig/utils/data_utils.py @@ -37,7 +37,10 @@ logger = logging.getLogger(__name__) +DATASET_SPLIT_URL = 'dataset_{}_fp' +DATA_PROCESSED_CACHE_DIR = 'data_processed_cache_dir' DATA_TRAIN_HDF5_FP = 'data_train_hdf5_fp' +HDF5_COLUMNS_KEY = 'columns' DICT_FORMATS = {'dict', 'dictionary', dict} DATAFRAME_FORMATS = {'dataframe', 'df', pd.DataFrame} CSV_FORMATS = {'csv'} @@ -62,6 +65,8 @@ ORC_FORMATS, SAS_FORMATS, SPSS_FORMATS, STATA_FORMATS)) +PANDAS_DF = pd + def get_split_path(dataset_fp): return os.path.splitext(dataset_fp)[0] + '.split.csv' @@ -81,11 +86,12 @@ def load_csv(data_fp): return data -def read_xsv(data_fp, separator=',', header=0, nrows=None, skiprows=None): +def read_xsv(data_fp, df_lib=PANDAS_DF, separator=',', header=0, nrows=None, skiprows=None): """ Helper method to read a csv file. Wraps around pd.read_csv to handle some exceptions. Can extend to cover cases as necessary :param data_fp: path to the xsv file + :param df_lib: DataFrame library used to read in the CSV :param separator: defaults separator to use for splitting :param header: header argument for pandas to read the csv :param nrows: number of rows to read from the csv, None means all @@ -102,14 +108,14 @@ def read_xsv(data_fp, separator=',', header=0, nrows=None, skiprows=None): pass try: - df = pd.read_csv(data_fp, sep=separator, header=header, - nrows=nrows, skiprows=skiprows) + df = df_lib.read_csv(data_fp, sep=separator, header=header, + nrows=nrows, skiprows=skiprows) except ParserError: logger.warning('Failed to parse the CSV with pandas default way,' ' trying \\ as escape character.') - df = pd.read_csv(data_fp, sep=separator, header=header, - escapechar='\\', - nrows=nrows, skiprows=skiprows) + df = df_lib.read_csv(data_fp, sep=separator, header=header, + escapechar='\\', + nrows=nrows, skiprows=skiprows) return df @@ -118,55 +124,55 @@ def read_xsv(data_fp, separator=',', header=0, nrows=None, skiprows=None): read_tsv = functools.partial(read_xsv, separator='\t') -def read_json(data_fp, normalize=False): +def read_json(data_fp, df_lib, normalize=False): if normalize: - return pd.json_normalize(load_json(data_fp)) + return df_lib.json_normalize(load_json(data_fp)) else: - return pd.read_json(data_fp) + return df_lib.read_json(data_fp) -def read_jsonl(data_fp): - return pd.read_json(data_fp, lines=True) +def read_jsonl(data_fp, df_lib): + return df_lib.read_json(data_fp, lines=True) -def read_excel(data_fp): - return pd.read_excel(data_fp) +def read_excel(data_fp, df_lib): + return df_lib.read_excel(data_fp) -def read_parquet(data_fp): - return pd.read_parquet(data_fp) +def read_parquet(data_fp, df_lib): + return df_lib.read_parquet(data_fp) -def read_pickle(data_fp): - return pd.read_pickle(data_fp) +def read_pickle(data_fp, df_lib): + return df_lib.read_pickle(data_fp) -def read_fwf(data_fp): - return pd.read_fwf(data_fp) +def read_fwf(data_fp, df_lib): + return df_lib.read_fwf(data_fp) -def read_feather(data_fp): - return pd.read_feather(data_fp) +def read_feather(data_fp, df_lib): + return df_lib.read_feather(data_fp) -def read_html(data_fp): - return pd.read_html(data_fp)[0] +def read_html(data_fp, df_lib): + return df_lib.read_html(data_fp)[0] -def read_orc(data_fp): - return pd.read_orc(data_fp) +def read_orc(data_fp, df_lib): + return df_lib.read_orc(data_fp) -def read_sas(data_fp): - return pd.read_sas(data_fp) +def read_sas(data_fp, df_lib): + return df_lib.read_sas(data_fp) -def read_spss(data_fp): - return pd.read_spss(data_fp) +def read_spss(data_fp, df_lib): + return df_lib.read_spss(data_fp) -def read_stata(data_fp): - return pd.read_stata(data_fp) +def read_stata(data_fp, df_lib): + return df_lib.read_stata(data_fp) def save_csv(data_fp, data): @@ -195,36 +201,42 @@ def save_json(data_fp, data, sort_keys=True, indent=4): indent=indent) -# to be tested -# also, when loading an hdf5 file -# most of the times you don't want -# to put everything in memory -# like this function does -# it's jsut for convenience for relatively small datasets -def load_hdf5(data_fp): - data = {} - with h5py.File(data_fp, 'r') as h5_file: - for key in h5_file.keys(): - data[key] = h5_file[key][()] - return data +def to_numpy_dataset(df): + dataset = {} + for col in df.columns: + dataset[col] = np.stack(df[col].to_numpy()) + return dataset -# def save_hdf5(data_fp: str, data: Dict[str, object]): -def save_hdf5(data_fp, data, metadata=None): - if metadata is None: - metadata = {} +def from_numpy_dataset(dataset): + col_mapping = {} + for k, v in dataset.items(): + *unstacked, = v + col_mapping[k] = unstacked + return pd.DataFrame.from_dict(col_mapping) + + +def save_hdf5(data_fp, data): mode = 'w' if os.path.isfile(data_fp): mode = 'r+' + + numpy_dataset = to_numpy_dataset(data) with h5py.File(data_fp, mode) as h5_file: - for key, value in data.items(): - dataset = h5_file.create_dataset(key, data=value) - if key in metadata: - if 'in_memory' in metadata[key]['preprocessing']: - if metadata[key]['preprocessing']['in_memory']: - dataset.attrs['in_memory'] = True - else: - dataset.attrs['in_memory'] = False + h5_file.create_dataset(HDF5_COLUMNS_KEY, data=np.array(data.columns.values, dtype='S')) + for column in data.columns: + h5_file.create_dataset(column, data=numpy_dataset[column]) + + +def load_hdf5(data_fp): + hdf5_data = h5py.File(data_fp, 'r') + columns = [s.decode('utf-8') for s in hdf5_data[HDF5_COLUMNS_KEY][()].tolist()] + + numpy_dataset = {} + for column in columns: + numpy_dataset[column] = hdf5_data[column][()] + + return from_numpy_dataset(numpy_dataset) def load_object(object_fp): @@ -366,21 +378,17 @@ def shuffle_dict_unison_inplace(np_dict, random_state=None): def split_dataset_ttv(dataset, split): - if SPLIT in dataset: - del dataset[SPLIT] - training_set = split_dataset(dataset, split, value_to_split=0) - validation_set = split_dataset(dataset, split, value_to_split=1) - test_set = split_dataset(dataset, split, value_to_split=2) + training_set = split_dataset(dataset, split, 0) + validation_set = split_dataset(dataset, split, 1) + test_set = split_dataset(dataset, split, 2) return training_set, test_set, validation_set def split_dataset(dataset, split, value_to_split=0): - splitted_dataset = {} - for key in dataset: - splitted_dataset[key] = dataset[key][split == value_to_split] - if len(splitted_dataset[key]) == 0: - return None - return splitted_dataset + split_df = dataset[dataset[split] == value_to_split] + if len(split_df) == 0: + return None + return split_df.reset_index() def collapse_rare_labels(labels, labels_limit): @@ -411,11 +419,9 @@ def load_from_file(file_name, field=None, dtype=int, ground_truth_split=2): :return: Experiment data as array """ if file_name.endswith('.hdf5') and field is not None: - hdf5_data = h5py.File(file_name, 'r') - split = hdf5_data[SPLIT][()] - column = hdf5_data[field][()] - hdf5_data.close() - array = column[split == ground_truth_split] # ground truth + dataset = pd.read_hdf(file_name, key=HDF5_COLUMNS_KEY) + column = dataset[field] + array = column[dataset[SPLIT] == ground_truth_split].values # ground truth elif file_name.endswith('.npy'): array = np.load(file_name) elif file_name.endswith('.csv'): diff --git a/ludwig/utils/misc_utils.py b/ludwig/utils/misc_utils.py index 6bea2041620..ebc209ad033 100644 --- a/ludwig/utils/misc_utils.py +++ b/ludwig/utils/misc_utils.py @@ -30,6 +30,7 @@ import numpy import ludwig.globals +from ludwig.constants import PROC_COLUMN from ludwig.utils.data_utils import figure_data_format @@ -243,5 +244,13 @@ def hash_dict(d: dict, max_length: Union[int, None] = 6) -> bytes: s = json.dumps(d, sort_keys=True, ensure_ascii=True) h = hashlib.md5(s.encode()) d = h.digest() - b = base64.b64encode(d) + b = base64.b64encode(d, altchars=b'__') return b[:max_length] + + +def get_proc_features(config): + return get_proc_features_from_lists(config['input_features'], config['output_features']) + + +def get_proc_features_from_lists(*args): + return {feature[PROC_COLUMN]: feature for features in args for feature in features} diff --git a/ludwig/utils/strings_utils.py b/ludwig/utils/strings_utils.py index 1bb425609b2..cb27239b772 100644 --- a/ludwig/utils/strings_utils.py +++ b/ludwig/utils/strings_utils.py @@ -22,6 +22,7 @@ import numpy as np +from ludwig.data.dataframe.pandas import PANDAS from ludwig.utils.math_utils import int_type from ludwig.utils.misc_utils import get_from_registry from ludwig.utils.nlp_utils import load_nlp_pipeline, process_text @@ -96,11 +97,10 @@ def create_vocabulary( vocab_file=None, unknown_symbol=UNKNOWN_SYMBOL, padding_symbol=PADDING_SYMBOL, - pretrained_model_name_or_path=None + pretrained_model_name_or_path=None, + processor=PANDAS, ): vocab = None - max_line_length = 0 - unit_counts = Counter() tokenizer = get_from_registry( tokenizer_type, @@ -136,10 +136,11 @@ def create_vocabulary( elif vocab_file is not None: vocab = load_vocabulary(vocab_file) - for line in data: - processed_line = tokenizer(line.lower() if lowercase else line) - unit_counts.update(processed_line) - max_line_length = max(max_line_length, len(processed_line)) + processed_lines = data.map(lambda line: tokenizer(line.lower() if lowercase else line)) + processed_counts = processed_lines.explode().value_counts(sort=False) + processed_counts = processor.compute(processed_counts) + unit_counts = Counter(dict(processed_counts)) + max_line_length = processor.compute(processed_lines.map(len).max()) if vocab is None: vocab = [unit for unit, count in @@ -217,8 +218,8 @@ def build_sequence_matrix( unknown_symbol=UNKNOWN_SYMBOL, lowercase=True, tokenizer_vocab_file=None, - pretrained_model_name_or_path=None - + pretrained_model_name_or_path=None, + processor=PANDAS, ): tokenizer = get_from_registry(tokenizer_type, tokenizer_registry)( vocab_file=tokenizer_vocab_file, @@ -227,38 +228,36 @@ def build_sequence_matrix( format_dtype = int_type(len(inverse_vocabulary) - 1) - max_length = 0 - unit_vectors = [] - for sequence in sequences: - unit_indices_vector = _get_sequence_vector( - sequence, - tokenizer, - tokenizer_type, - format_dtype, - inverse_vocabulary, - lowercase=lowercase, - unknown_symbol=unknown_symbol - ) - unit_vectors.append(unit_indices_vector) - if len(unit_indices_vector) > max_length: - max_length = len(unit_indices_vector) + unit_vectors = sequences.map(lambda sequence: _get_sequence_vector( + sequence, + tokenizer, + tokenizer_type, + format_dtype, + inverse_vocabulary, + lowercase=lowercase, + unknown_symbol=unknown_symbol + )) + max_length = processor.compute(unit_vectors.map(len).max()) if max_length < length_limit: logging.debug('max length of {0}: {1} < limit: {2}'.format( format, max_length, length_limit )) max_length = length_limit - sequence_matrix = np.full((len(sequences), max_length), - inverse_vocabulary[padding_symbol], - dtype=format_dtype) - for i, vector in enumerate(unit_vectors): + + def pad(vector): + sequence = np.full((max_length,), + inverse_vocabulary[padding_symbol], + dtype=format_dtype) limit = min(vector.shape[0], max_length) if padding == 'right': - sequence_matrix[i, :limit] = vector[:limit] + sequence[:limit] = vector[:limit] else: # if padding == 'left - sequence_matrix[i, max_length - limit:] = vector[:limit] + sequence[max_length - limit:] = vector[:limit] + return sequence - return sequence_matrix + padded = processor.map_objects(unit_vectors, pad) + return padded class BaseTokenizer: diff --git a/requirements.txt b/requirements.txt index 9f3702b5df4..51dfb02c250 100644 --- a/requirements.txt +++ b/requirements.txt @@ -12,7 +12,6 @@ PyYAML>=3.12 absl-py requests - # new data format support xlwt # excel xlrd # excel diff --git a/tests/integration_tests/test_api.py b/tests/integration_tests/test_api.py index 144697ae36c..ea458b50a46 100644 --- a/tests/integration_tests/test_api.py +++ b/tests/integration_tests/test_api.py @@ -22,68 +22,12 @@ from ludwig.api import LudwigModel from ludwig.utils.data_utils import read_csv -from tests.integration_tests.utils import ENCODERS +from tests.integration_tests.utils import ENCODERS, run_api_experiment from tests.integration_tests.utils import category_feature from tests.integration_tests.utils import generate_data from tests.integration_tests.utils import sequence_feature -def run_api_experiment(input_features, output_features, data_csv): - """ - Helper method to avoid code repetition in running an experiment - :param input_features: input schema - :param output_features: output schema - :param data_csv: path to data - :return: None - """ - config = { - 'input_features': input_features, - 'output_features': output_features, - 'combiner': {'type': 'concat', 'fc_size': 14}, - 'training': {'epochs': 2} - } - - model = LudwigModel(config) - output_dir = None - - try: - # Training with csv - _, _, output_dir = model.train( - dataset=data_csv, - skip_save_processed_input=True, - skip_save_progress=True, - skip_save_unprocessed_output=True - ) - model.predict(dataset=data_csv) - - model_dir = os.path.join(output_dir, 'model') - loaded_model = LudwigModel.load(model_dir) - - # Necessary before call to get_weights() to materialize the weights - loaded_model.predict(dataset=data_csv) - - model_weights = model.model.get_weights() - loaded_weights = loaded_model.model.get_weights() - for model_weight, loaded_weight in zip(model_weights, loaded_weights): - assert np.allclose(model_weight, loaded_weight) - finally: - # Remove results/intermediate data saved to disk - shutil.rmtree(output_dir, ignore_errors=True) - - try: - # Training with dataframe - data_df = read_csv(data_csv) - _, _, output_dir = model.train( - dataset=data_df, - skip_save_processed_input=True, - skip_save_progress=True, - skip_save_unprocessed_output=True - ) - model.predict(dataset=data_df) - finally: - shutil.rmtree(output_dir, ignore_errors=True) - - def run_api_experiment_separated_datasets( input_features, output_features, diff --git a/tests/integration_tests/test_experiment.py b/tests/integration_tests/test_experiment.py index 257be71c202..001ffde62ea 100644 --- a/tests/integration_tests/test_experiment.py +++ b/tests/integration_tests/test_experiment.py @@ -19,21 +19,21 @@ import uuid from collections import namedtuple -import pandas as pd import pytest import yaml from ludwig.api import LudwigModel +from ludwig.backend import LOCAL_BACKEND from ludwig.data.concatenate_datasets import concatenate_df from ludwig.data.preprocessing import preprocess_for_training from ludwig.experiment import experiment_cli from ludwig.features.h3_feature import H3InputFeature from ludwig.predict import predict_cli -from ludwig.utils.data_utils import read_csv, replace_file_extension +from ludwig.utils.data_utils import read_csv from ludwig.utils.defaults import default_random_seed from tests.conftest import delete_temporary_data from tests.integration_tests.utils import ENCODERS, HF_ENCODERS, \ - HF_ENCODERS_SHORT, slow + HF_ENCODERS_SHORT, slow, create_data_set_to_use from tests.integration_tests.utils import audio_feature from tests.integration_tests.utils import bag_feature from tests.integration_tests.utils import binary_feature @@ -332,108 +332,6 @@ def test_experiment_image_inputs(image_parms: ImageParms, csv_filename: str): shutil.rmtree(image_dest_folder) -# helper function for generating training and test data with specified format -def create_data_set_to_use(data_format, raw_data): - # handles all data formats except for hdf5 - # assumes raw_data is a csv dataset generated by - # tests.integration_tests.utils.generate_data() function - - # support for writing to a fwf dataset based on this stackoverflow posting: - # https://stackoverflow.com/questions/16490261/python-pandas-write-dataframe-to-fixed-width-file-to-fwf - from tabulate import tabulate - def to_fwf(df, fname): - content = tabulate(df.values.tolist(), list(df.columns), - tablefmt="plain") - open(fname, "w").write(content) - - pd.DataFrame.to_fwf = to_fwf - - dataset_to_use = None - - if data_format == 'csv': - dataset_to_use = raw_data - - elif data_format in {'df', 'dict'}: - dataset_to_use = pd.read_csv(raw_data) - if data_format == 'dict': - dataset_to_use = dataset_to_use.to_dict(orient='list') - - elif data_format == 'excel': - dataset_to_use = replace_file_extension(raw_data, 'xlsx') - pd.read_csv(raw_data).to_excel( - dataset_to_use, - index=False - ) - - elif data_format == 'feather': - dataset_to_use = replace_file_extension(raw_data, 'feather') - pd.read_csv(raw_data).to_feather( - dataset_to_use - ) - - elif data_format == 'fwf': - dataset_to_use = replace_file_extension(raw_data, 'fwf') - pd.read_csv(raw_data).to_fwf( - dataset_to_use - ) - - elif data_format == 'html': - dataset_to_use = replace_file_extension(raw_data, 'html') - pd.read_csv(raw_data).to_html( - dataset_to_use, - index=False - ) - - elif data_format == 'json': - dataset_to_use = replace_file_extension(raw_data, 'json') - pd.read_csv(raw_data).to_json( - dataset_to_use, - orient='records' - ) - - elif data_format == 'jsonl': - dataset_to_use = replace_file_extension(raw_data, 'jsonl') - pd.read_csv(raw_data).to_json( - dataset_to_use, - orient='records', - lines=True - ) - - elif data_format == 'parquet': - dataset_to_use = replace_file_extension(raw_data, 'parquet') - pd.read_csv(raw_data).to_parquet( - dataset_to_use, - index=False - ) - - elif data_format == 'pickle': - dataset_to_use = replace_file_extension(raw_data, 'pickle') - pd.read_csv(raw_data).to_pickle( - dataset_to_use - ) - - elif data_format == 'stata': - dataset_to_use = replace_file_extension(raw_data, 'stata') - pd.read_csv(raw_data).to_stata( - dataset_to_use - ) - - elif data_format == 'tsv': - dataset_to_use = replace_file_extension(raw_data, 'tsv') - pd.read_csv(raw_data).to_csv( - dataset_to_use, - sep='\t', - index=False - ) - - else: - ValueError( - "'{}' is an unrecognized data format".format(data_format) - ) - - return dataset_to_use - - IMAGE_DATA_FORMATS_TO_TEST = ['csv', 'df', 'dict', 'hdf5'] @pytest.mark.parametrize('test_in_memory', [True, False]) @pytest.mark.parametrize('test_format', IMAGE_DATA_FORMATS_TO_TEST) @@ -868,7 +766,7 @@ def test_image_resizing_num_channel_handling(csv_filename): ) df2 = read_csv(rel_path) - df = concatenate_df(df1, df2, None) + df = concatenate_df(df1, df2, None, LOCAL_BACKEND) df.to_csv(rel_path, index=False) # Here the user sepcifiies number of channels. Exception shouldn't be thrown diff --git a/tests/integration_tests/test_kfold_cv.py b/tests/integration_tests/test_kfold_cv.py index a49e5b7f237..e58ce1eeb99 100644 --- a/tests/integration_tests/test_kfold_cv.py +++ b/tests/integration_tests/test_kfold_cv.py @@ -9,8 +9,7 @@ from ludwig.api import kfold_cross_validate from ludwig.experiment import kfold_cross_validate_cli from ludwig.utils.data_utils import load_json -from tests.integration_tests.test_experiment import create_data_set_to_use -from tests.integration_tests.utils import binary_feature +from tests.integration_tests.utils import binary_feature, create_data_set_to_use from tests.integration_tests.utils import category_feature from tests.integration_tests.utils import generate_data from tests.integration_tests.utils import numerical_feature diff --git a/tests/integration_tests/test_model_save_and_load.py b/tests/integration_tests/test_model_save_and_load.py index 0f3bb4fdc67..7d9f71c83e8 100644 --- a/tests/integration_tests/test_model_save_and_load.py +++ b/tests/integration_tests/test_model_save_and_load.py @@ -6,6 +6,7 @@ import tensorflow as tf from ludwig.api import LudwigModel +from ludwig.constants import SPLIT from ludwig.data.preprocessing import get_split from ludwig.utils.data_utils import split_dataset_ttv, read_csv from tests.integration_tests.utils import binary_feature, numerical_feature, \ @@ -62,9 +63,10 @@ def test_model_save_reload_api(csv_filename, tmp_path): } data_df = read_csv(data_csv_path) + data_df[SPLIT] = get_split(data_df) training_set, test_set, validation_set = split_dataset_ttv( data_df, - get_split(data_df) + SPLIT ) training_set = pd.DataFrame(training_set) validation_set = pd.DataFrame(validation_set) diff --git a/tests/integration_tests/test_sequence_features.py b/tests/integration_tests/test_sequence_features.py index 35a9a2bfc76..d0b67a35e35 100644 --- a/tests/integration_tests/test_sequence_features.py +++ b/tests/integration_tests/test_sequence_features.py @@ -9,7 +9,8 @@ from ludwig.data.dataset_synthesizer import build_synthetic_dataset from ludwig.data.preprocessing import preprocess_for_training from ludwig.features.feature_registries import update_config_with_metadata -from ludwig.utils.batcher import initialize_batcher +from ludwig.data.dataset_synthesizer import build_synthetic_dataset +from tests.integration_tests.utils import sequence_feature from tests.integration_tests.utils import ENCODERS from tests.integration_tests.utils import generate_data from tests.integration_tests.utils import run_experiment @@ -87,9 +88,7 @@ def setup_model_scaffolding( model.model = model.create_model(model.config) # setup batcher to go through synthetic data - batcher = initialize_batcher( - training_set - ) + batcher = training_set.initialize_batcher() return model, batcher diff --git a/tests/integration_tests/test_visualization_api.py b/tests/integration_tests/test_visualization_api.py index 86a356d162c..806301a745c 100644 --- a/tests/integration_tests/test_visualization_api.py +++ b/tests/integration_tests/test_visualization_api.py @@ -25,6 +25,7 @@ from ludwig import visualize from ludwig.api import LudwigModel +from ludwig.constants import SPLIT from ludwig.data.preprocessing import get_split from ludwig.utils.data_utils import read_csv, split_dataset_ttv from tests.integration_tests.utils import category_feature, \ @@ -132,8 +133,8 @@ def obtain_df_splits(data_csv): data_df = read_csv(data_csv) # Obtain data split array mapping data rows to split type # 0-train, 1-validation, 2-test - data_split = get_split(data_df) - train_split, test_split, val_split = split_dataset_ttv(data_df, data_split) + data_df[SPLIT] = get_split(data_df) + train_split, test_split, val_split = split_dataset_ttv(data_df, SPLIT) # Splits are python dictionaries not dataframes- they need to be converted. test_df = pd.DataFrame(test_split) train_df = pd.DataFrame(train_split) diff --git a/tests/integration_tests/utils.py b/tests/integration_tests/utils.py index 5c445f26f4d..9de747f8f14 100644 --- a/tests/integration_tests/utils.py +++ b/tests/integration_tests/utils.py @@ -24,13 +24,16 @@ from distutils.util import strtobool import cloudpickle +import numpy as np import pandas as pd +from ludwig.api import LudwigModel from ludwig.constants import VECTOR, COLUMN, NAME, PROC_COLUMN from ludwig.data.dataset_synthesizer import DATETIME_FORMATS from ludwig.data.dataset_synthesizer import build_synthetic_dataset from ludwig.experiment import experiment_cli from ludwig.features.feature_utils import compute_feature_hash +from ludwig.utils.data_utils import read_csv, replace_file_extension ENCODERS = [ 'embed', 'rnn', 'parallel_cnn', 'cnnrnn', 'stacked_parallel_cnn', @@ -95,7 +98,8 @@ def generate_data( input_features, output_features, filename='test_csv.csv', - num_examples=25 + num_examples=25, + ): """ Helper method to generate synthetic data based on input, output feature @@ -445,3 +449,161 @@ def wrapped_fn(*args, **kwargs): return results return wrapped_fn + + +def run_api_experiment(input_features, output_features, data_csv): + """ + Helper method to avoid code repetition in running an experiment + :param input_features: input schema + :param output_features: output schema + :param data_csv: path to data + :return: None + """ + config = { + 'input_features': input_features, + 'output_features': output_features, + 'combiner': {'type': 'concat', 'fc_size': 14}, + 'training': {'epochs': 2} + } + + model = LudwigModel(config) + output_dir = None + + try: + # Training with csv + _, _, output_dir = model.train( + dataset=data_csv, + skip_save_processed_input=True, + skip_save_progress=True, + skip_save_unprocessed_output=True + ) + model.predict(dataset=data_csv) + + model_dir = os.path.join(output_dir, 'model') + loaded_model = LudwigModel.load(model_dir) + + # Necessary before call to get_weights() to materialize the weights + loaded_model.predict(dataset=data_csv) + + model_weights = model.model.get_weights() + loaded_weights = loaded_model.model.get_weights() + for model_weight, loaded_weight in zip(model_weights, loaded_weights): + assert np.allclose(model_weight, loaded_weight) + finally: + # Remove results/intermediate data saved to disk + shutil.rmtree(output_dir, ignore_errors=True) + + try: + # Training with dataframe + data_df = read_csv(data_csv) + _, _, output_dir = model.train( + dataset=data_df, + skip_save_processed_input=True, + skip_save_progress=True, + skip_save_unprocessed_output=True + ) + model.predict(dataset=data_df) + finally: + shutil.rmtree(output_dir, ignore_errors=True) + + +def create_data_set_to_use(data_format, raw_data): + # helper function for generating training and test data with specified format + # handles all data formats except for hdf5 + # assumes raw_data is a csv dataset generated by + # tests.integration_tests.utils.generate_data() function + + # support for writing to a fwf dataset based on this stackoverflow posting: + # https://stackoverflow.com/questions/16490261/python-pandas-write-dataframe-to-fixed-width-file-to-fwf + from tabulate import tabulate + def to_fwf(df, fname): + content = tabulate(df.values.tolist(), list(df.columns), + tablefmt="plain") + open(fname, "w").write(content) + + pd.DataFrame.to_fwf = to_fwf + + dataset_to_use = None + + if data_format == 'csv': + dataset_to_use = raw_data + + elif data_format in {'df', 'dict'}: + dataset_to_use = pd.read_csv(raw_data) + if data_format == 'dict': + dataset_to_use = dataset_to_use.to_dict(orient='list') + + elif data_format == 'excel': + dataset_to_use = replace_file_extension(raw_data, 'xlsx') + pd.read_csv(raw_data).to_excel( + dataset_to_use, + index=False + ) + + elif data_format == 'feather': + dataset_to_use = replace_file_extension(raw_data, 'feather') + pd.read_csv(raw_data).to_feather( + dataset_to_use + ) + + elif data_format == 'fwf': + dataset_to_use = replace_file_extension(raw_data, 'fwf') + pd.read_csv(raw_data).to_fwf( + dataset_to_use + ) + + elif data_format == 'html': + dataset_to_use = replace_file_extension(raw_data, 'html') + pd.read_csv(raw_data).to_html( + dataset_to_use, + index=False + ) + + elif data_format == 'json': + dataset_to_use = replace_file_extension(raw_data, 'json') + pd.read_csv(raw_data).to_json( + dataset_to_use, + orient='records' + ) + + elif data_format == 'jsonl': + dataset_to_use = replace_file_extension(raw_data, 'jsonl') + pd.read_csv(raw_data).to_json( + dataset_to_use, + orient='records', + lines=True + ) + + elif data_format == 'parquet': + dataset_to_use = replace_file_extension(raw_data, 'parquet') + pd.read_csv(raw_data).to_parquet( + dataset_to_use, + index=False + ) + + elif data_format == 'pickle': + dataset_to_use = replace_file_extension(raw_data, 'pickle') + pd.read_csv(raw_data).to_pickle( + dataset_to_use + ) + + elif data_format == 'stata': + dataset_to_use = replace_file_extension(raw_data, 'stata') + pd.read_csv(raw_data).to_stata( + dataset_to_use + ) + + elif data_format == 'tsv': + dataset_to_use = replace_file_extension(raw_data, 'tsv') + pd.read_csv(raw_data).to_csv( + dataset_to_use, + sep='\t', + index=False + ) + + else: + ValueError( + "'{}' is an unrecognized data format".format(data_format) + ) + + return dataset_to_use diff --git a/tests/ludwig/utils/test_normalization.py b/tests/ludwig/utils/test_normalization.py index 97f5a88ed62..a2d8aaa520f 100644 --- a/tests/ludwig/utils/test_normalization.py +++ b/tests/ludwig/utils/test_normalization.py @@ -16,6 +16,7 @@ import numpy as np import pandas as pd +from ludwig.backend import LOCAL_BACKEND from ludwig.constants import PROC_COLUMN, COLUMN, NAME from ludwig.features.feature_utils import compute_feature_hash from ludwig.features.numerical_feature import NumericalFeatureMixin @@ -35,15 +36,15 @@ def numerical_feature(): 10 ]), columns=['x']) -data = pd.DataFrame(columns=['x']) +proc_df = pd.DataFrame(columns=['x']) def test_norm(): feature_1_meta = NumericalFeatureMixin.get_feature_meta( - data_df['x'], {'normalization': 'zscore'} + data_df['x'], {'normalization': 'zscore'}, LOCAL_BACKEND ) feature_2_meta = NumericalFeatureMixin.get_feature_meta( - data_df['x'], {'normalization': 'minmax'} + data_df['x'], {'normalization': 'minmax'}, LOCAL_BACKEND ) assert feature_1_meta['mean'] == 6 @@ -55,23 +56,25 @@ def test_norm(): NumericalFeatureMixin.add_feature_data( feature=num_feature, - dataset_df=data_df, - dataset=data, + input_df=data_df, + proc_df=proc_df, metadata={num_feature[NAME]: feature_1_meta}, - preprocessing_parameters={'normalization': 'zscore'} + preprocessing_parameters={'normalization': 'zscore'}, + backend=LOCAL_BACKEND ) - assert np.allclose(np.array(data[num_feature[PROC_COLUMN]]), + assert np.allclose(np.array(proc_df[num_feature[PROC_COLUMN]]), np.array([-1.26491106, -0.63245553, 0, 0.63245553, 1.26491106]) ) NumericalFeatureMixin.add_feature_data( feature=num_feature, - dataset_df=data_df, - dataset=data, + input_df=data_df, + proc_df=proc_df, metadata={num_feature[NAME]: feature_2_meta}, - preprocessing_parameters={'normalization': 'minmax'} + preprocessing_parameters={'normalization': 'minmax'}, + backend=LOCAL_BACKEND ) - assert np.allclose(np.array(data[num_feature[PROC_COLUMN]]), + assert np.allclose(np.array(proc_df[num_feature[PROC_COLUMN]]), np.array([0, 0.25, 0.5, 0.75, 1]) )