Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support for cloud object storage (S3, GCS, ADLS, etc.) #1164

Merged
merged 40 commits into from
May 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
d656a17
Added support for remote filesystems (S3, GCS, ADLS, etc.)
tgaddair Apr 23, 2021
acc71e8
Use fsspec to open files
tgaddair Apr 24, 2021
6cdf128
More file utils
tgaddair Apr 24, 2021
ee399c8
Fixed fsspec modes
tgaddair Apr 24, 2021
62910b2
Fixed various h5py issues
tgaddair Apr 24, 2021
188f1c2
Fixed h5py
tgaddair Apr 25, 2021
3712958
Added remote tests
tgaddair Apr 25, 2021
5035c07
Fixed uploading
tgaddair Apr 25, 2021
da4e359
Merge branch 'master' into remote-fs
tgaddair Apr 25, 2021
028e7b2
Reduced epochs
tgaddair Apr 25, 2021
46459dc
WIP added intermediate format to backend
tgaddair Apr 25, 2021
24c0026
WIP refactor caching
tgaddair Apr 26, 2021
8482292
WIP fixed HDF5
tgaddair Apr 26, 2021
f14973a
WIP fixed HDF5
tgaddair Apr 26, 2021
4b1ea8d
Set default data formats
tgaddair Apr 26, 2021
643260a
Removed create_dataset
tgaddair Apr 26, 2021
4f32bb3
Fixed checksum
tgaddair Apr 26, 2021
3d53542
Refactor
tgaddair Apr 26, 2021
0d36166
Fixed Dask and Ray
tgaddair Apr 26, 2021
fe0ab92
Added assertions
tgaddair Apr 26, 2021
8ef1ad3
Fixed image features
tgaddair Apr 28, 2021
2d8743c
Fixed parquet cache
tgaddair Apr 28, 2021
83e8cb7
Fixed caching
tgaddair Apr 28, 2021
9be9de3
Fixed tests
tgaddair Apr 28, 2021
b6996c6
Fixed loading from cached parquet
tgaddair Apr 28, 2021
e38c46e
Fixed Dask tests
tgaddair Apr 28, 2021
4596a56
Test disable parquet tests
tgaddair Apr 28, 2021
9b30173
Close Petastorm reader when finished
tgaddair Apr 28, 2021
02bb160
Added docker deps
tgaddair Apr 28, 2021
318a0af
Try without build-essential
tgaddair Apr 28, 2021
8df0839
Added back build-essential
tgaddair Apr 28, 2021
1ef0d86
build-essential
tgaddair Apr 28, 2021
50b75c0
Fixed preprocess_for_prediction
tgaddair Apr 29, 2021
b4e043e
Fixed typo
tgaddair Apr 29, 2021
686f50b
Use only alphanumeric characters in filenames
tgaddair May 2, 2021
22e6134
Merge
tgaddair May 2, 2021
96a7252
Removed expensive length check
tgaddair May 2, 2021
e35fb5e
Revert "Removed expensive length check"
tgaddair May 2, 2021
643d559
Upgraded petastorm
tgaddair May 2, 2021
d239777
Merge branch 'master' into remote-fs
w4nderlust May 5, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 15 additions & 14 deletions ludwig/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from typing import Dict, List, Optional, Tuple, Union

import ludwig.contrib
from ludwig.utils.fs_utils import upload_output_directory, open_file, path_exists, makedirs

ludwig.contrib.contrib_import()
import numpy as np
Expand Down Expand Up @@ -172,7 +173,7 @@ def __init__(
"""
# check if config is a path or a dict
if isinstance(config, str): # assume path
with open(config, 'r') as def_file:
with open_file(config, 'r') as def_file:
config_dict = yaml.safe_load(def_file)
self.config_fp = config
else:
Expand Down Expand Up @@ -323,7 +324,7 @@ def train(
"""
# setup directories and file names
if model_resume_path is not None:
if os.path.exists(model_resume_path):
if path_exists(model_resume_path):
output_directory = model_resume_path
else:
if self.backend.is_coordinator():
Expand Down Expand Up @@ -354,15 +355,15 @@ def train(
skip_save_processed_input
)

description_fn = training_stats_fn = model_dir = None
if self.backend.is_coordinator():
if should_create_output_directory:
if not os.path.exists(output_directory):
os.makedirs(output_directory, exist_ok=True)
description_fn, training_stats_fn, model_dir = get_file_names(
output_directory)
output_url = output_directory
with upload_output_directory(output_directory) as output_directory:
description_fn = training_stats_fn = model_dir = None
if self.backend.is_coordinator():
if should_create_output_directory:
makedirs(output_directory, exist_ok=True)
description_fn, training_stats_fn, model_dir = get_file_names(
output_directory)

with self.backend.create_cache_dir():
if isinstance(training_set, Dataset) and training_set_metadata is not None:
preprocessed_data = (training_set, validation_set, test_set, training_set_metadata)
else:
Expand Down Expand Up @@ -536,7 +537,7 @@ def train(
# Load the best weights from saved checkpoint
self.load_weights(model_dir)

return train_stats, preprocessed_data, output_directory
return train_stats, preprocessed_data, output_url

def train_online(
self,
Expand Down Expand Up @@ -698,7 +699,7 @@ def predict(
skip_save_unprocessed_output and skip_save_predictions
)
if should_create_exp_dir:
os.makedirs(output_directory, exist_ok=True)
makedirs(output_directory, exist_ok=True)

logger.debug('Postprocessing')
postproc_predictions = postprocess(
Expand Down Expand Up @@ -842,7 +843,7 @@ def evaluate(
skip_save_eval_stats
)
if should_create_exp_dir:
os.makedirs(output_directory, exist_ok=True)
makedirs(output_directory, exist_ok=True)

if collect_predictions:
logger.debug('Postprocessing')
Expand Down Expand Up @@ -1626,7 +1627,7 @@ def kfold_cross_validate(

# if config is a path, convert to dictionary
if isinstance(config, str): # assume path
with open(config, 'r') as def_file:
with open_file(config, 'r') as def_file:
config = yaml.safe_load(def_file)

# check for k_fold
Expand Down
32 changes: 16 additions & 16 deletions ludwig/backend/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,23 +29,23 @@
ALL_BACKENDS = [LOCAL, DASK, HOROVOD, RAY]


def get_local_backend():
return LOCAL_BACKEND
def get_local_backend(**kwargs):
return LocalBackend(**kwargs)


def create_dask_backend():
def create_dask_backend(**kwargs):
from ludwig.backend.dask import DaskBackend
return DaskBackend()
return DaskBackend(**kwargs)


def create_horovod_backend():
def create_horovod_backend(**kwargs):
from ludwig.backend.horovod import HorovodBackend
return HorovodBackend()
return HorovodBackend(**kwargs)


def create_ray_backend():
def create_ray_backend(**kwargs):
from ludwig.backend.ray import RayBackend
return RayBackend()
return RayBackend(**kwargs)


backend_registry = {
Expand All @@ -57,17 +57,17 @@ def create_ray_backend():
}


def create_backend(backend):
if isinstance(backend, Backend):
return backend
def create_backend(name, **kwargs):
if isinstance(name, Backend):
return name

if backend is None and has_horovodrun():
backend = HOROVOD
if name is None and has_horovodrun():
name = HOROVOD

return backend_registry[backend]()
return backend_registry[name](**kwargs)


def initialize_backend(backend):
backend = create_backend(backend)
def initialize_backend(name, **kwargs):
backend = create_backend(name, **kwargs)
backend.initialize()
return backend
48 changes: 12 additions & 36 deletions ludwig/backend/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,53 +15,29 @@
# limitations under the License.
# ==============================================================================

import os
import tempfile
import uuid

from abc import ABC, abstractmethod
from contextlib import contextmanager

from ludwig.data.cache.manager import CacheManager
from ludwig.data.dataframe.pandas import PANDAS
from ludwig.data.dataset import create_dataset_manager
from ludwig.models.predictor import Predictor
from ludwig.models.trainer import Trainer
from ludwig.utils.tf_utils import initialize_tensorflow


class CacheMixin:
_cache_dir: str
class Backend(ABC):
def __init__(self, cache_dir=None, data_format=None):
self._dataset_manager = create_dataset_manager(self, data_format)
self._cache_manager = CacheManager(self._dataset_manager, cache_dir)

@property
def cache_enabled(self):
return self._cache_dir is not None

def create_cache_entry(self):
return os.path.join(self.cache_dir, str(uuid.uuid1()))
def cache(self):
return self._cache_manager

@property
def cache_dir(self):
if not self._cache_dir:
raise ValueError('Cache directory not available, try calling `with backend.create_cache_dir()`.')
return self._cache_dir

@contextmanager
def create_cache_dir(self):
prev_cache_dir = self._cache_dir
try:
if self._cache_dir:
os.makedirs(self._cache_dir, exist_ok=True)
yield self._cache_dir
else:
with tempfile.TemporaryDirectory() as tmpdir:
self._cache_dir = tmpdir
yield tmpdir
finally:
self._cache_dir = prev_cache_dir


class Backend(CacheMixin, ABC):
def __init__(self, cache_dir=None):
self._cache_dir = cache_dir
def dataset_manager(self):
return self._dataset_manager

@abstractmethod
def initialize(self):
Expand Down Expand Up @@ -148,8 +124,8 @@ def is_coordinator(self):


class LocalBackend(LocalPreprocessingMixin, LocalTrainingMixin, Backend):
def __init__(self):
super().__init__()
def __init__(self, **kwargs):
super().__init__(**kwargs)

def initialize(self):
pass
11 changes: 8 additions & 3 deletions ludwig/backend/dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,19 @@
# ==============================================================================

from ludwig.backend.base import Backend, LocalTrainingMixin
from ludwig.constants import NAME
from ludwig.constants import NAME, PARQUET
from ludwig.data.dataframe.dask import DaskEngine


class DaskBackend(LocalTrainingMixin, Backend):
def __init__(self):
super().__init__()
def __init__(self, data_format=PARQUET, **kwargs):
super().__init__(data_format=data_format, **kwargs)
self._df_engine = DaskEngine()
if data_format != PARQUET:
raise ValueError(
f'Data format {data_format} is not supported when using the Dask backend. '
f'Try setting to `parquet`.'
)

def initialize(self):
pass
Expand Down
4 changes: 2 additions & 2 deletions ludwig/backend/horovod.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@


class HorovodBackend(LocalPreprocessingMixin, Backend):
def __init__(self):
super().__init__()
def __init__(self, **kwargs):
super().__init__(**kwargs)
self._horovod = None

def initialize(self):
Expand Down
11 changes: 8 additions & 3 deletions ludwig/backend/ray.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from ray.util.dask import ray_dask_get

from ludwig.backend.base import Backend, RemoteTrainingMixin
from ludwig.constants import NAME
from ludwig.constants import NAME, PARQUET
from ludwig.data.dataframe.dask import DaskEngine
from ludwig.models.predictor import BasePredictor, RemotePredictor
from ludwig.models.trainer import BaseTrainer, RemoteTrainer
Expand Down Expand Up @@ -173,11 +173,16 @@ def shutdown(self):


class RayBackend(RemoteTrainingMixin, Backend):
def __init__(self, horovod_kwargs=None):
super().__init__()
def __init__(self, horovod_kwargs=None, data_format=PARQUET, **kwargs):
super().__init__(data_format=data_format, **kwargs)
self._df_engine = DaskEngine()
self._horovod_kwargs = horovod_kwargs or {}
self._tensorflow_kwargs = {}
if data_format != PARQUET:
raise ValueError(
f'Data format {data_format} is not supported when using the Ray backend. '
f'Try setting to `parquet`.'
)

def initialize(self):
try:
Expand Down
3 changes: 3 additions & 0 deletions ludwig/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,3 +108,6 @@

CHECKSUM = 'checksum'

HDF5 = 'hdf5'
PARQUET = 'parquet'

Empty file added ludwig/data/cache/__init__.py
Empty file.
Loading