Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Orca: Reload DF or shard dataloader to keep consistence with pytorch dataloader #7728

Merged
merged 13 commits into from
Mar 23, 2023
21 changes: 15 additions & 6 deletions python/dllib/src/bigdl/dllib/utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,8 +201,9 @@ def _is_scalar_type(dtype, accept_str_col=False):
return False


def convert_row_to_numpy(row, schema, feature_cols, label_cols, accept_str_col=False):
def convert_for_cols(row, cols):
def convert_row_to_numpy(row, schema, feature_cols, label_cols,
accept_str_col=False, unpack_list=False):
def convert_for_cols(row, cols, is_label=False):
import pyspark.sql.types as df_types
result = []
for name in cols:
Expand Down Expand Up @@ -253,16 +254,24 @@ def convert_for_cols(row, cols):
invalidInputError(isinstance(row[name], SparseVector),
"unsupported field {}, data {}".format(schema[name], row[name]))
result.append(row[name].toArray())
if len(result) == 1:
if len(result) == 1 and is_label:
return result[0]
return result

features = convert_for_cols(row, feature_cols)
# For pytorch we format multi-input as `f1, f2, label` instead of `[f1, f2], label`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We format multi-input as f1, f2, label instead of [f1, f2], label to align with PyTorch DataLoader

# to align with PyTorch DataLoader.
if label_cols:
labels = convert_for_cols(row, label_cols)
return (features, labels)
labels = convert_for_cols(row, label_cols, True)
if unpack_list:
return (*features, labels)
else:
return (features, labels)
else:
return (features,)
if unpack_list:
return (*features,)
else:
return (features,)


def toMultiShape(shape):
Expand Down
11 changes: 5 additions & 6 deletions python/orca/src/bigdl/orca/data/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,9 +179,9 @@ def combine(data_list):
res = {}
for k, v in item.items():
res[k] = np.concatenate([data[k] for data in data_list], axis=0)
elif isinstance(item, list) or isinstance(item, tuple):
elif isinstance(item, (list, tuple)):
res = []
for i in range(len(data_list[0])):
for i in range(len(item)):
res.append(np.concatenate([data[i] for data in data_list], axis=0))
if isinstance(item, tuple):
res = tuple(res)
Expand Down Expand Up @@ -375,14 +375,13 @@ def index_data(x, i):
for k, v in x.items():
res[k] = v[i]
return res
elif isinstance(x, tuple):
return tuple(item[i] for item in x)
elif isinstance(x, list):
elif isinstance(x, (list, tuple)):
return [item[i] for item in x]
else:
invalidInputError(False,
"data should be an ndarray, a dict of ndarrays, a tuple of ndarrays"
" or a list of ndarrays, please check your input")
return []


def get_size(x):
Expand All @@ -391,7 +390,7 @@ def get_size(x):
elif isinstance(x, dict):
for k, v in x.items():
return len(v)
elif isinstance(x, tuple) or isinstance(x, list):
elif isinstance(x, (list, tuple)):
return len(x[0])
else:
invalidInputError(False,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,11 @@ def __len__(self):
return get_size(self.y)

def __getitem__(self, i):
return index_data(self.x, i), index_data(self.y, i)
index_data_x = index_data(self.x, i)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For we can only allocate x and y as two split part here, we need to reform multi-input as [x1, x2] as the whole x

if isinstance(index_data_x, (list, tuple)):
return (*index_data_x, index_data(self.y, i))
else:
return (index_data_x, index_data(self.y, i))

params = {"batch_size": batch_size, "shuffle": True}
for arg in ["shuffle", "sampler", "batch_sampler", "num_workers", "collate_fn",
Expand All @@ -92,7 +96,7 @@ def __getitem__(self, i):
data_loader = DataLoader(dataset, **params)
return data_loader

return data_creator
return reload_dataloader_creator(data_creator)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why we need to reload here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will disable reload_dataloader_creator in the next pr, now we just keep everything the same before we create the dataloader.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok



def parse_model_dir(model_dir):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shoudn't we remove the reload in data_creator function if branch? (in fit)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,11 @@ def __len__(self):
return get_size(self.y)

def __getitem__(self, i):
return index_data(self.x, i), index_data(self.y, i)
index_data_x = index_data(self.x, i)
if isinstance(index_data_x, (list, tuple)):
return (*index_data_x, index_data(self.y, i))
else:
return (index_data_x, index_data(self.y, i))

params = {"batch_size": batch_size, "shuffle": True}
for arg in ["shuffle", "sampler", "batch_sampler", "num_workers", "collate_fn",
Expand All @@ -77,7 +81,7 @@ def __getitem__(self, i):
data_loader = DataLoader(dataset, **params)
return data_loader

return data_creator
return reload_dataloader_creator(data_creator)


class PyTorchRayEstimator(BaseRayEstimator):
Expand Down
27 changes: 19 additions & 8 deletions python/orca/src/bigdl/orca/learn/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ def init_result_lists(first_row, cols):
return [[] for r in cols]

def add_row(data, results, current):
if not isinstance(data, list) and not isinstance(data, dict):
if not isinstance(data, (list, tuple, dict)):
arrays = [data]
else:
arrays = data
Expand All @@ -318,15 +318,16 @@ def add_row(data, results, current):
feature_lists = None
label_lists = None
counter = 0
feature_tail = len(feature_cols) if feature_cols else None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove this


for row in iter:
if feature_lists is None:
feature_lists = init_result_lists(row[0], feature_cols)
add_row(row[0], feature_lists, counter)
feature_lists = init_result_lists(row[:feature_tail], feature_cols)
add_row(row[:feature_tail], feature_lists, counter)
if label_cols is not None:
if label_lists is None:
label_lists = init_result_lists(row[1], label_cols)
add_row(row[1], label_lists, counter)
label_lists = init_result_lists(get_label_row(row, feature_tail), label_cols)
add_row(get_label_row(row, feature_tail), label_lists, counter)
counter += 1

if shard_size and counter % shard_size == 0:
Expand Down Expand Up @@ -357,6 +358,13 @@ def add_row(data, results, current):
arrays2pandas = partial(arrays2others, generate_func=_generate_output_pandas_df)


def get_label_row(row, anchor):
if anchor == len(row)-1: # In case label is the last one
return row[-1]
else:
return row[anchor:]


def transform_to_shard_dict(data, feature_cols, label_cols=None):
def single_col_to_numpy(col_series, dtype):
if dtype == np.ndarray:
Expand Down Expand Up @@ -413,7 +421,8 @@ def _dataframe_to_xshards(data, feature_cols, label_cols=None,
schema,
feature_cols,
label_cols,
accept_str_col))
accept_str_col,
unpack_list=True))
shard_rdd = numpy_rdd.mapPartitions(lambda x: arrays2dict(x,
feature_cols,
label_cols,
Expand Down Expand Up @@ -442,7 +451,8 @@ def dataframe_to_xshards_of_feature_dict(data, feature_cols, label_cols=None,
schema,
feature_cols,
label_cols,
accept_str_col))
accept_str_col,
unpack_list=True))
shard_rdd = numpy_rdd.mapPartitions(lambda x: arrays2feature_dict(x,
feature_cols,
label_cols,
Expand All @@ -469,7 +479,8 @@ def dataframe_to_xshards_of_pandas_df(data, feature_cols, label_cols=None, accep
schema,
feature_cols,
label_cols,
accept_str_col))
accept_str_col,
unpack_list=True))
shard_rdd = numpy_rdd.mapPartitions(lambda x: arrays2pandas(x,
feature_cols,
label_cols,
Expand Down