Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added DataFrame wrapper type and fixed usage of optional imports #1371

Merged
merged 7 commits into from
Oct 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/pytest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ jobs:
- python-version: 3.7
tensorflow-version: 2.5.0
- python-version: 3.8
tensorflow-version: nightly
tensorflow-version: 2.5.0
env:
TENSORFLOW: ${{ matrix.tensorflow-version }}
MARKERS: ${{ matrix.test-markers }}
Expand Down
2 changes: 1 addition & 1 deletion ludwig/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from pprint import pformat
from typing import Dict, List, Optional, Tuple, Union

from ludwig.data.dataset.partitioned import RayDataset
from ludwig.data.dataset.ray import RayDataset
from ludwig.utils.fs_utils import upload_output_directory, path_exists, makedirs

import numpy as np
Expand Down
2 changes: 1 addition & 1 deletion ludwig/backend/ray.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from ludwig.data.dataframe.dask import DaskEngine
from ludwig.data.dataframe.pandas import PandasEngine
from ludwig.data.dataset.pandas import PandasDataset
from ludwig.data.dataset.partitioned import RayDataset
from ludwig.data.dataset.ray import RayDataset
from ludwig.models.ecd import ECD
from ludwig.models.predictor import BasePredictor, Predictor, get_output_columns
from ludwig.models.trainer import BaseTrainer, RemoteTrainer
Expand Down
28 changes: 20 additions & 8 deletions ludwig/data/dataset/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,29 @@
# limitations under the License.
# ==============================================================================

from ludwig.data.dataset.pandas import PandasDatasetManager
from ludwig.data.dataset.parquet import ParquetDatasetManager
from ludwig.data.dataset.tfrecord import TFRecordDatasetManager

def get_pandas_dataset_manager(**kwargs):
from ludwig.data.dataset.pandas import PandasDatasetManager
return PandasDatasetManager(**kwargs)


def get_parquet_dataset_manager(**kwargs):
from ludwig.data.dataset.parquet import ParquetDatasetManager
return ParquetDatasetManager(**kwargs)


def get_tfrecord_dataset_manager(**kwargs):
from ludwig.data.dataset.tfrecord import TFRecordDatasetManager
return TFRecordDatasetManager(**kwargs)


dataset_registry = {
'parquet': ParquetDatasetManager,
'hdf5': PandasDatasetManager,
'tfrecord': TFRecordDatasetManager,
None: PandasDatasetManager,
'parquet': get_parquet_dataset_manager,
'hdf5': get_pandas_dataset_manager,
'tfrecord': get_tfrecord_dataset_manager,
None: get_pandas_dataset_manager,
}


def create_dataset_manager(backend, cache_format, **kwargs):
return dataset_registry.get(cache_format)(backend)
return dataset_registry[cache_format](backend=backend, **kwargs)
2 changes: 1 addition & 1 deletion ludwig/data/dataset/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import tensorflow as tf
from ludwig.data.dataset.pandas import PandasDataset
from ludwig.data.dataset.partitioned import RayDataset
from ludwig.data.dataset.ray import RayDataset
from ludwig.utils.data_utils import DATA_TRAIN_HDF5_FP

from petastorm import make_batch_reader
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,14 @@
# ==============================================================================
from typing import Dict, List

try:
from dask.dataframe import DataFrame
from ray.data import from_dask
except ImportError:
pass
from ludwig.utils.types import DataFrame


class RayDataset(object):
""" Wrapper around ray.data.Dataset. """

def __init__(self, df: DataFrame, features: List[Dict], data_hdf5_fp: str):
from ray.data import from_dask
self.ds = from_dask(df)
self.features = features
self.data_hdf5_fp = data_hdf5_fp
11 changes: 4 additions & 7 deletions ludwig/features/base_feature.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,8 @@
from abc import ABC, abstractmethod
from typing import Dict

try:
import dask.dataframe as dd
except ImportError:
pass
import pandas as pd
from ludwig.utils.types import DataFrame

import tensorflow as tf

from ludwig.constants import *
Expand Down Expand Up @@ -496,11 +493,11 @@ def prepare_decoder_inputs(

return feature_hidden

def flatten(self, df: pd.DataFrame) -> pd.DataFrame:
def flatten(self, df: DataFrame) -> DataFrame:
""" Converts the output of batch_predict to a 1D array. """
return df

def unflatten(self, df: dd.DataFrame) -> dd.DataFrame:
def unflatten(self, df: DataFrame) -> DataFrame:
""" Reshapes a flattened 1D array into its original shape. """
return df

11 changes: 3 additions & 8 deletions ludwig/features/sequence_feature.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
import os

try:
import dask.dataframe as dd
except ImportError:
pass
import numpy as np
import pandas as pd

from ludwig.constants import *
from ludwig.decoders.sequence_decoders import DECODER_REGISTRY
Expand All @@ -47,6 +41,7 @@
from ludwig.utils.strings_utils import build_sequence_matrix
from ludwig.utils.strings_utils import create_vocabulary
from ludwig.utils.strings_utils import tokenizer_registry
from ludwig.utils.types import DataFrame

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -556,12 +551,12 @@ def populate_defaults(output_feature):
set_default_value(output_feature, 'reduce_input', SUM)
set_default_value(output_feature, 'reduce_dependencies', SUM)

def flatten(self, df: pd.DataFrame) -> pd.DataFrame:
def flatten(self, df: DataFrame) -> DataFrame:
probs_col = f'{self.feature_name}_{PROBABILITIES}'
df[probs_col] = df[probs_col].apply(lambda x: x.flatten())
return df

def unflatten(self, df: dd.DataFrame) -> dd.DataFrame:
def unflatten(self, df: DataFrame) -> DataFrame:
probs_col = f'{self.feature_name}_{PROBABILITIES}'
df[probs_col] = df[probs_col].apply(
lambda x: x.reshape(-1, self.num_classes),
Expand Down
12 changes: 4 additions & 8 deletions ludwig/features/text_feature.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,10 @@
# limitations under the License.
# ==============================================================================
import logging
from collections.abc import Iterable

import numpy as np
import pandas as pd
import tensorflow as tf
try:
import dask.dataframe as dd
except ImportError:
pass


from ludwig.constants import *
from ludwig.encoders.text_encoders import ENCODER_REGISTRY
Expand All @@ -39,6 +34,7 @@
from ludwig.utils.strings_utils import build_sequence_matrix
from ludwig.utils.strings_utils import create_vocabulary
from ludwig.utils.strings_utils import tokenizer_registry
from ludwig.utils.types import DataFrame


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -513,12 +509,12 @@ def populate_defaults(output_feature):
set_default_value(output_feature, 'level', 'word')
SequenceOutputFeature.populate_defaults(output_feature)

def flatten(self, df: pd.DataFrame) -> pd.DataFrame:
def flatten(self, df: DataFrame) -> DataFrame:
probs_col = f'{self.feature_name}_{PROBABILITIES}'
df[probs_col] = df[probs_col].apply(lambda x: x.flatten())
return df

def unflatten(self, df: dd.DataFrame) -> dd.DataFrame:
def unflatten(self, df: DataFrame) -> DataFrame:
probs_col = f'{self.feature_name}_{PROBABILITIES}'
df[probs_col] = df[probs_col].apply(
lambda x: x.reshape(-1, self.max_sequence_length),
Expand Down
9 changes: 9 additions & 0 deletions ludwig/utils/types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from typing import Union

import pandas as pd

try:
import dask.dataframe as dd
DataFrame = Union[pd.DataFrame, dd.DataFrame]
except ImportError:
DataFrame = pd.DataFrame
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ scipy>=0.18
tabulate>=0.7
scikit-learn<1.0
tqdm
tensorflow>=2.3.1
tensorflow>=2.3.1,<2.6.0
tensorflow-addons==0.13.0
PyYAML>=3.12
absl-py
Expand Down
2 changes: 1 addition & 1 deletion tests/integration_tests/test_cache_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from ludwig.constants import META, TRAINING, VALIDATION, TEST, CHECKSUM
from ludwig.data.cache.manager import CacheManager, alphanum
from ludwig.data.dataset import PandasDatasetManager
from ludwig.data.dataset.pandas import PandasDatasetManager

from tests.integration_tests.utils import sequence_feature, category_feature, LocalTestBackend

Expand Down