Skip to content

Commit

Permalink
Added Backend interface for abstracting DataFrame preprocessing steps (
Browse files Browse the repository at this point in the history
…#1014)

Co-authored-by: w4nderlust <w4nderlust@gmail.com>
  • Loading branch information
tgaddair and w4nderlust committed Nov 26, 2020
1 parent e245fa2 commit 496009e
Show file tree
Hide file tree
Showing 40 changed files with 1,438 additions and 894 deletions.
4 changes: 2 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ build/
develop-eggs/
dist/
downloads/
dataset/
./dataset/
eggs/
.eggs/
lib/
Expand Down Expand Up @@ -140,4 +140,4 @@ tags

# examples
examples/*/data/
examples/*/visualizations/
examples/*/visualizations/
327 changes: 176 additions & 151 deletions ludwig/api.py

Large diffs are not rendered by default.

35 changes: 35 additions & 0 deletions ludwig/backend/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
#! /usr/bin/env python
# coding=utf-8
# Copyright (c) 2020 Uber Technologies, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================

from ludwig.backend.base import Backend, LocalBackend


LOCAL_BACKEND = LocalBackend()


def get_local_backend():
return LOCAL_BACKEND


backend_registry = {
'local': get_local_backend,
None: get_local_backend,
}


def create_backend(name):
return backend_registry[name]()
88 changes: 88 additions & 0 deletions ludwig/backend/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
#! /usr/bin/env python
# coding=utf-8
# Copyright (c) 2020 Uber Technologies, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================

import os
import tempfile
import uuid

from abc import ABC, abstractmethod
from contextlib import contextmanager

from ludwig.data.dataframe.pandas import PandasEngine


class Backend(ABC):
def __init__(self, cache_dir=None):
self._cache_dir = cache_dir

@property
@abstractmethod
def df_engine(self):
raise NotImplementedError()

@property
@abstractmethod
def supports_multiprocessing(self):
raise NotImplementedError()

@abstractmethod
def check_lazy_load_supported(self, feature):
raise NotImplementedError()

@property
def cache_enabled(self):
return self._cache_dir is not None

def create_cache_entry(self):
return os.path.join(self.cache_dir, str(uuid.uuid1()))

@property
def cache_dir(self):
if not self._cache_dir:
raise ValueError('Cache directory not available, try calling `with backend.create_cache_dir()`.')
return self._cache_dir

@contextmanager
def create_cache_dir(self):
prev_cache_dir = self._cache_dir
try:
if self._cache_dir:
os.makedirs(self._cache_dir, exist_ok=True)
yield self._cache_dir
else:
with tempfile.TemporaryDirectory() as tmpdir:
self._cache_dir = tmpdir
yield tmpdir
finally:
self._cache_dir = prev_cache_dir


class LocalBackend(Backend):
def __init__(self):
super().__init__()
self._df_engine = PandasEngine()

@property
def df_engine(self):
return self._df_engine

@property
def supports_multiprocessing(self):
return True

def check_lazy_load_supported(self, feature):
pass
57 changes: 35 additions & 22 deletions ludwig/data/concatenate_datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,53 +18,66 @@
import logging

import numpy as np
import pandas as pd

from ludwig.backend import LOCAL_BACKEND
from ludwig.constants import SPLIT
from ludwig.utils.data_utils import read_csv

logger = logging.getLogger(__name__)


def concatenate(train_csv, vali_csv, test_csv, output_csv):
concatenated_df = concatenate_datasets(train_csv, vali_csv, test_csv)
def concatenate_csv(train_csv, vali_csv, test_csv, output_csv):
concatenated_df = concatenate_files(
train_csv, vali_csv, test_csv, read_csv, LOCAL_BACKEND
)

logger.info('Saving concatenated dataset as csv..')
concatenated_df.to_csv(output_csv, encoding='utf-8', index=False)
logger.info('done')


def concatenate_datasets(training_set, Validation_set, test_set,
read_fn=read_csv):
logger.info('Loading training dataset...')
train_df = read_fn(training_set)
def concatenate_files(train_fname, vali_fname, test_fname, read_fn, backend):
df_lib = backend.df_engine.df_lib

logger.info('Loading training file...')
train_df = read_fn(train_fname, df_lib)
logger.info('done')

logger.info('Loading validation dataset..')
vali_df = read_fn(Validation_set) if Validation_set is not None else None
logger.info('Loading validation file..')
vali_df = read_fn(vali_fname, df_lib) if vali_fname is not None else None
logger.info('done')

logger.info('Loading test dataset..')
test_df = read_fn(test_set) if test_set is not None else None
logger.info('Loading test file..')
test_df = read_fn(test_fname, df_lib) if test_fname is not None else None
logger.info('done')

logger.info('Concatenating datasets..')
concatenated_df = concatenate_df(train_df, vali_df, test_df)
logger.info('Concatenating files..')
concatenated_df = concatenate_df(train_df, vali_df, test_df, backend)
logger.info('done')

return concatenated_df


def concatenate_df(train_df, vali_df, test_df):
def concatenate_df(train_df, vali_df, test_df, backend):
train_size = len(train_df)
vali_size = len(vali_df) if vali_df is not None else 0
test_size = len(test_df) if test_df is not None else 0
concatenated_df = pd.concat([train_df, vali_df, test_df],
ignore_index=True)
split = np.array(
[0] * train_size + [1] * vali_size + [2] * test_size,
dtype=np.int8

concatenated_df = backend.df_engine.df_lib.concat(
[df for df in [train_df, vali_df, test_df] if df is not None],
ignore_index=True
)
concatenated_df = concatenated_df.assign(split=pd.Series(split).values)

def get_split(idx):
if idx < train_size:
return 0
if idx < train_size + vali_size:
return 1
return 2

concatenated_df[SPLIT] = concatenated_df.index.to_series().map(
get_split
).astype(np.int8)

return concatenated_df


Expand Down Expand Up @@ -92,4 +105,4 @@ def concatenate_df(train_df, vali_df, test_df):
parser.add_argument('-o', '--output_csv', help='output csv')
args = parser.parse_args()

concatenate(args.train_csv, args.vali_csv, args.test_csv, args.output_csv)
concatenate_csv(args.train_csv, args.vali_csv, args.test_csv, args.output_csv)
16 changes: 16 additions & 0 deletions ludwig/data/dataframe/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#! /usr/bin/env python
# coding=utf-8
# Copyright (c) 2020 Uber Technologies, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
67 changes: 67 additions & 0 deletions ludwig/data/dataframe/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
#! /usr/bin/env python
# coding=utf-8
# Copyright (c) 2020 Uber Technologies, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================

from abc import ABC, abstractmethod


class DataFrameEngine(ABC):
@abstractmethod
def empty_df_like(self, df):
raise NotImplementedError()

@abstractmethod
def parallelize(self, data):
raise NotImplementedError()

@abstractmethod
def persist(self, data):
raise NotImplementedError()

@abstractmethod
def compute(self, data):
raise NotImplementedError()

@abstractmethod
def from_pandas(self, df):
raise NotImplementedError()

@abstractmethod
def map_objects(self, series, map_fn):
raise NotImplementedError()

@abstractmethod
def reduce_objects(self, series, reduce_fn):
raise NotImplementedError()

@abstractmethod
def create_dataset(self, dataset, tag, config, training_set_metadata):
raise NotImplementedError()

@property
@abstractmethod
def array_lib(self):
raise NotImplementedError()

@property
@abstractmethod
def df_lib(self):
raise NotImplementedError()

@property
@abstractmethod
def use_hdf5_cache(self):
raise NotImplementedError()
69 changes: 69 additions & 0 deletions ludwig/data/dataframe/pandas.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
#! /usr/bin/env python
# coding=utf-8
# Copyright (c) 2020 Uber Technologies, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================

import numpy as np
import pandas as pd

from ludwig.data.dataset import Dataset
from ludwig.data.dataframe.base import DataFrameEngine
from ludwig.utils.data_utils import DATA_TRAIN_HDF5_FP
from ludwig.utils.misc_utils import get_proc_features


class PandasEngine(DataFrameEngine):
def empty_df_like(self, df):
return pd.DataFrame(index=df.index)

def parallelize(self, data):
return data

def persist(self, data):
return data

def compute(self, data):
return data

def from_pandas(self, df):
return df

def map_objects(self, series, map_fn):
return series.map(map_fn)

def reduce_objects(self, series, reduce_fn):
return reduce_fn(series)

def create_dataset(self, dataset, tag, config, training_set_metadata):
return Dataset(
dataset,
get_proc_features(config),
training_set_metadata.get(DATA_TRAIN_HDF5_FP)
)

@property
def array_lib(self):
return np

@property
def df_lib(self):
return pd

@property
def use_hdf5_cache(self):
return True


PANDAS = PandasEngine()
Loading

0 comments on commit 496009e

Please sign in to comment.