Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: support sourcing images from either file path or in-memory data frame #1174

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
2d2f3d0
refactor: rename key image related variable names
jimthompson5802 May 4, 2021
e82bf75
refactor: code to now accept numpy arrays
jimthompson5802 May 4, 2021
0d2a16c
teat: add unit test for ndarray images feature
jimthompson5802 May 5, 2021
b5453cf
teat: add in-memory parameter for unit test
jimthompson5802 May 5, 2021
9d85020
teat: add skip_save_processed_input option to test case
jimthompson5802 May 5, 2021
9b6d638
test: insert additional if test for file test
jimthompson5802 May 7, 2021
d836e72
refactor: incorporated reviewer comments
jimthompson5802 May 8, 2021
1c6edbb
Merge branch 'master' into support_sourcing_images_from_memory
jimthompson5802 May 8, 2021
1656dae
doc: update image test comments
jimthompson5802 May 8, 2021
ce54d1c
refactor: add assert on response status code
jimthompson5802 May 8, 2021
9ebb9b9
refactor: prep for adding ndarray test
jimthompson5802 May 8, 2021
96e44e9
test: adapt test_server unit test to accept ndarray
jimthompson5802 May 9, 2021
419ed2c
test: add skip_save_processed_input parameter
jimthompson5802 May 9, 2021
3fc7fa4
refactor: major refactor of REST API serialize/deserialize functions
jimthompson5802 May 15, 2021
f534409
refactor: incorporate reviewer comments & misc cleanup
jimthompson5802 May 15, 2021
e01f0a1
refactor: code clean up
jimthompson5802 May 16, 2021
d1aefd4
Merge branch 'master' into support_sourcing_images_from_memory
jimthompson5802 May 16, 2021
42317aa
doc: update docstring
jimthompson5802 May 16, 2021
15aa133
doc: minor edits in docstring
jimthompson5802 May 16, 2021
d6166e0
refactor: remove obsolete code
jimthompson5802 May 16, 2021
4ef7051
refactor: model server unit test
jimthompson5802 May 17, 2021
f73bc9b
refactor: rename variable to be consistent
jimthompson5802 May 17, 2021
efcb466
refactor: modify test for dask backend and use of hdf5 cache
jimthompson5802 May 18, 2021
3e2a2ce
refactor: re-introduce _write_file() function
jimthompson5802 May 20, 2021
7769afe
test: split out single vs multiple record audio unit test
jimthompson5802 May 20, 2021
3174467
refactor: short-term fix for Issue #1181
jimthompson5802 May 20, 2021
a1d4a22
Merge remote-tracking branch 'upstream/master' into support_sourcing_…
jimthompson5802 May 22, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions ludwig/backend/dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
# ==============================================================================

from ludwig.backend.base import Backend, LocalTrainingMixin
from ludwig.constants import NAME, PARQUET
from ludwig.constants import NAME, PARQUET, PREPROCESSING
from ludwig.data.dataframe.dask import DaskEngine
from ludwig.data.dataset.partitioned import PartitionedDataset
from ludwig.models.predictor import BasePredictor, Predictor, get_output_columns
Expand Down Expand Up @@ -105,5 +105,7 @@ def supports_multiprocessing(self):
return False

def check_lazy_load_supported(self, feature):
raise ValueError(f'DaskBackend does not support lazy loading of data files at train time. '
f'Set preprocessing config `in_memory: True` for feature {feature[NAME]}')
if not feature[PREPROCESSING]['in_memory']:
raise ValueError(
f'DaskBackend does not support lazy loading of data files at train time. '
f'Set preprocessing config `in_memory: True` for feature {feature[NAME]}')
18 changes: 14 additions & 4 deletions ludwig/data/preprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -1047,6 +1047,7 @@ def build_dataset(
metadata=None,
backend=LOCAL_BACKEND,
random_seed=default_random_seed,
skip_save_processed_input=False
):
df_engine = backend.df_engine
dataset_df = df_engine.parallelize(dataset_df)
Expand Down Expand Up @@ -1075,7 +1076,8 @@ def build_dataset(
dataset_df,
features,
metadata,
backend
backend,
skip_save_processed_input
)

dataset[SPLIT] = get_split(
Expand Down Expand Up @@ -1187,7 +1189,13 @@ def build_metadata(
return metadata


def build_data(input_df, features, training_set_metadata, backend):
def build_data(
input_df,
features,
training_set_metadata,
backend,
skip_save_processed_input
):
proc_df = backend.df_engine.empty_df_like(input_df)
for feature in features:

Expand All @@ -1213,7 +1221,8 @@ def build_data(input_df, features, training_set_metadata, backend):
proc_df,
training_set_metadata,
preprocessing_parameters,
backend
backend,
skip_save_processed_input
)

return proc_df
Expand Down Expand Up @@ -1512,7 +1521,8 @@ def _preprocess_file_for_training(
preprocessing_params,
metadata=training_set_metadata,
backend=backend,
random_seed=random_seed
random_seed=random_seed,
skip_save_processed_input=skip_save_processed_input
)

if backend.is_coordinator() and not skip_save_processed_input:
Expand Down
3 changes: 2 additions & 1 deletion ludwig/features/audio_feature.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,8 @@ def add_feature_data(
proc_df,
metadata,
preprocessing_parameters,
backend
backend,
skip_save_processed_input
):
set_default_value(
feature['preprocessing'],
Expand Down
3 changes: 2 additions & 1 deletion ludwig/features/bag_feature.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ def add_feature_data(
proc_df,
metadata,
preprocessing_parameters,
backend
backend,
skip_save_processed_input
):
proc_df[feature[PROC_COLUMN]] = BagFeatureMixin.feature_data(
input_df[feature[COLUMN]].astype(str),
Expand Down
3 changes: 2 additions & 1 deletion ludwig/features/binary_feature.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ def add_feature_data(
proc_df,
metadata,
preprocessing_parameters,
backend
backend,
skip_save_processed_input
):
column = input_df[feature[COLUMN]]

Expand Down
3 changes: 2 additions & 1 deletion ludwig/features/category_feature.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ def add_feature_data(
proc_df,
metadata,
preprocessing_parameters,
backend
backend,
skip_save_processed_input
):
proc_df[feature[PROC_COLUMN]] = CategoryFeatureMixin.feature_data(
input_df[feature[COLUMN]].astype(str),
Expand Down
3 changes: 2 additions & 1 deletion ludwig/features/date_feature.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,8 @@ def add_feature_data(
proc_df,
metadata,
preprocessing_parameters,
backend
backend,
skip_save_processed_input
):
datetime_format = preprocessing_parameters['datetime_format']
proc_df[feature[PROC_COLUMN]] = backend.df_engine.map_objects(
Expand Down
3 changes: 2 additions & 1 deletion ludwig/features/h3_feature.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ def add_feature_data(
proc_df,
metadata,
preprocessing_parameters,
backend
backend,
skip_save_processed_input
):
column = input_df[feature[COLUMN]]
if column.dtype == object:
Expand Down
103 changes: 69 additions & 34 deletions ludwig/features/image_feature.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import sys
from functools import partial
from multiprocessing import Pool
from typing import Union

import numpy as np
import tensorflow as tf
Expand Down Expand Up @@ -58,16 +59,17 @@ def get_feature_meta(column, preprocessing_parameters, backend):

@staticmethod
def _read_image_and_resize(
filepath,
img_width,
img_height,
should_resize,
num_channels,
resize_method,
user_specified_num_channels
img_entry: Union[str, 'numpy.array'],
img_width: int,
img_height: int,
should_resize: bool,
num_channels: int,
resize_method: str,
user_specified_num_channels: int
):
"""
:param filepath: path to the image
:param img_entry Union[str, 'numpy.array']: if str file path to the
image else numpy.array of the image itself
:param img_width: expected width of the image
:param img_height: expected height of the image
:param should_resize: Should the image be resized?
Expand Down Expand Up @@ -95,7 +97,10 @@ def _read_image_and_resize(
)
sys.exit(-1)

img = imread(filepath)
if isinstance(img_entry, str):
img = imread(img_entry)
else:
img = img_entry
img_num_channels = num_channels_in_image(img)
if img_num_channels == 1:
img = img.reshape((img.shape[0], img.shape[1], 1))
Expand All @@ -120,19 +125,18 @@ def _read_image_and_resize(

if img_num_channels != num_channels:
logger.warning(
"Image {0} has {1} channels, where as {2} "
"Image has {0} channels, where as {1} "
"channels are expected. Dropping/adding channels "
"with 0s as appropriate".format(
filepath, img_num_channels, num_channels))
img_num_channels, num_channels))
else:
# If the image isn't like the first image, raise exception
if img_num_channels != num_channels:
raise ValueError(
'Image {0} has {1} channels, unlike the first image, which '
'has {2} channels. Make sure all the images have the same '
'Image has {0} channels, unlike the first image, which '
'has {1} channels. Make sure all the images have the same '
'number of channels or use the num_channels property in '
'image preprocessing'.format(filepath,
img_num_channels,
'image preprocessing'.format(img_num_channels,
num_channels))

if img.shape[0] != img_height or img.shape[1] != img_width:
Expand All @@ -152,8 +156,8 @@ def _read_image_and_resize(

@staticmethod
def _finalize_preprocessing_parameters(
preprocessing_parameters,
first_image_path
preprocessing_parameters: dict,
first_img_entry: Union[str, 'numpy.array']
):
"""
Helper method to determine the height, width and number of channels for
Expand All @@ -174,7 +178,10 @@ def _finalize_preprocessing_parameters(
)
sys.exit(-1)

first_image = imread(first_image_path)
if isinstance(first_img_entry, str):
first_image = imread(first_img_entry)
else:
first_image = first_img_entry
first_img_height = first_image.shape[0]
first_img_width = first_image.shape[1]
first_img_num_channels = num_channels_in_image(first_image)
Expand Down Expand Up @@ -231,7 +238,8 @@ def add_feature_data(
proc_df,
metadata,
preprocessing_parameters,
backend
backend,
skip_save_processed_input
):
in_memory = preprocessing_parameters['in_memory']
if PREPROCESSING in feature and 'in_memory' in feature[PREPROCESSING]:
Expand All @@ -250,12 +258,23 @@ def add_feature_data(
if num_images == 0:
raise ValueError('There are no images in the dataset provided.')

first_path = next(iter(input_df[feature[COLUMN]]))
first_img_entry = next(iter(input_df[feature[COLUMN]]))
logger.debug(
'Detected image feature type is {}'.format(type(first_img_entry))
)

if src_path is None and not os.path.isabs(first_path):
raise ValueError('Image file paths must be absolute')
if not isinstance(first_img_entry, str) \
and not isinstance(first_img_entry, np.ndarray):
raise ValueError(
'Invalid image feature data type. Detected type is {}, '
'expect either string for file path or numpy array.'
.format(type(first_img_entry))
)

first_path = get_abs_path(src_path, first_path)
if isinstance(first_img_entry, str):
if src_path is None and not os.path.isabs(first_img_entry):
raise ValueError('Image file paths must be absolute')
first_img_entry = get_abs_path(src_path, first_img_entry)

(
should_resize,
Expand All @@ -265,7 +284,7 @@ def add_feature_data(
user_specified_num_channels,
first_image
) = ImageFeatureMixin._finalize_preprocessing_parameters(
preprocessing_parameters, first_path
preprocessing_parameters, first_img_entry
)

metadata[feature[NAME]][PREPROCESSING]['height'] = height
Expand All @@ -283,7 +302,11 @@ def add_feature_data(
user_specified_num_channels=user_specified_num_channels
)

if in_memory:
# check to see if the active backend can support lazy loading of
# image features from the hdf5 cache.
backend.check_lazy_load_supported(feature)

if in_memory or skip_save_processed_input:
# Number of processes to run in parallel for preprocessing
metadata[feature[NAME]][PREPROCESSING][
'num_processes'] = num_processes
Expand All @@ -294,32 +317,44 @@ def add_feature_data(
# standard code anyway.
if backend.supports_multiprocessing and (
num_processes > 1 or num_images > 1):
all_file_paths = [get_abs_path(src_path, file_path)
for file_path in input_df[feature[NAME]]]
all_img_entries = [get_abs_path(src_path, img_entry)
if isinstance(img_entry, str) else img_entry
for img_entry in input_df[feature[COLUMN]]]

with Pool(num_processes) as pool:
logger.debug(
'Using {} processes for preprocessing images'.format(
num_processes
)
)
proc_df[feature[PROC_COLUMN]] = pool.map(read_image_and_resize, all_file_paths)
proc_df[feature[PROC_COLUMN]] = pool.map(
read_image_and_resize, all_img_entries
)
else:
# If we're not running multiple processes and we are only processing one
# image just use this faster shortcut, bypassing multiprocessing.Pool.map
logger.debug(
'No process pool initialized. Using internal process for preprocessing images'
)

# helper function for handling single image
def _get_processed_image(img_store):
if isinstance(img_store, str):
return read_image_and_resize(
get_abs_path(src_path, img_store)
)
else:
return read_image_and_resize(img_store)

proc_df[feature[PROC_COLUMN]] = backend.df_engine.map_objects(
input_df[feature[COLUMN]],
lambda file_path: read_image_and_resize(get_abs_path(src_path, file_path))
_get_processed_image
)
else:
backend.check_lazy_load_supported(feature)

all_file_paths = [get_abs_path(src_path, file_path)
for file_path in input_df[feature[NAME]]]
all_img_entries = [get_abs_path(src_path, img_entry)
if isinstance(img_entry, str) else img_entry
for img_entry in input_df[feature[COLUMN]]]

data_fp = backend.cache.get_cache_path(
input_df.src, metadata.get(CHECKSUM), TRAINING
Expand All @@ -331,9 +366,9 @@ def add_feature_data(
(num_images, height, width, num_channels),
dtype=np.uint8
)
for i, filepath in enumerate(all_file_paths):
for i, img_entry in enumerate(all_img_entries):
image_dataset[i, :height, :width, :] = (
read_image_and_resize(filepath)
read_image_and_resize(img_entry)
)
h5_file.flush()

Expand Down
3 changes: 2 additions & 1 deletion ludwig/features/numerical_feature.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ def add_feature_data(
proc_df,
metadata,
preprocessing_parameters,
backend
backend,
skip_save_processed_input
):
proc_df[feature[PROC_COLUMN]] = input_df[feature[COLUMN]].astype(
np.float32).values
Expand Down
3 changes: 2 additions & 1 deletion ludwig/features/sequence_feature.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ def add_feature_data(
proc_df,
metadata,
preprocessing_parameters,
backend
backend,
skip_save_processed_input
):
sequence_data = SequenceInputFeature.feature_data(
input_df[feature[COLUMN]].astype(str),
Expand Down
3 changes: 2 additions & 1 deletion ludwig/features/set_feature.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ def add_feature_data(
proc_df,
metadata,
preprocessing_parameters,
backend
backend,
skip_save_processed_input
):
proc_df[feature[PROC_COLUMN]] = SetFeatureMixin.feature_data(
input_df[feature[COLUMN]].astype(str),
Expand Down
3 changes: 2 additions & 1 deletion ludwig/features/text_feature.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,8 @@ def add_feature_data(
proc_df,
metadata,
preprocessing_parameters,
backend
backend,
skip_save_processed_input
):
chars_data, words_data = TextFeatureMixin.feature_data(
input_df[feature[COLUMN]].astype(str),
Expand Down
3 changes: 2 additions & 1 deletion ludwig/features/timeseries_feature.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ def add_feature_data(
proc_df,
metadata,
preprocessing_parameters,
backend
backend,
skip_save_processed_input
):
proc_df[feature[PROC_COLUMN]] = TimeseriesFeatureMixin.feature_data(
input_df[feature[COLUMN]].astype(str),
Expand Down
Loading