Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add sample_size as a global preprocessing parameter #3650

Merged
merged 12 commits into from
Oct 12, 2023
Merged
9 changes: 9 additions & 0 deletions ludwig/data/preprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -1211,6 +1211,15 @@ def build_dataset(
logger.debug(f"sample {sample_ratio} of data")
dataset_df = dataset_df.sample(frac=sample_ratio, random_state=random_seed)

sample_cap = global_preprocessing_parameters["sample_cap"]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you refactor this section out into a separate function?

dataset_df = get_sampled_dataset_df(dataset_df, sample_ratio, sample_cap)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep. Done!

if sample_cap:
if sample_ratio < 1.0:
raise ValueError("sample_cap cannot be used when sample_ratio < 1.0")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering if we can push this up into a schema validation check, i.e., if preprocessing sample_ratio is specified and it is < 1 and sample_cap is also specified, then raise a ConfigValidationError?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. If we can implement this as an auxiliary validation, that would allow the config to fail as early as possible.

if sample_cap < len(dataset_df):
dataset_df = dataset_df.sample(n=sample_cap, random_state=random_seed)
else:
logger.info("sample_cap is larger than dataset size, ignoring sample_cap")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps logger.warning?


# If persisting DataFrames in memory is enabled, we want to do this after
# each batch of parallel ops in order to avoid redundant computation
dataset_df = df_engine.persist(dataset_df)
Expand Down
7 changes: 5 additions & 2 deletions ludwig/explain/captum.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,9 +279,11 @@ def get_input_tensors(

:return: A list of variables, one for each input feature. Shape of each variable is [batch size, embedding size].
"""
# Ignore sample_ratio from the model config, since we want to explain all the data.
# Ignore sample_ratio and sample_cap from the model config, since we want to explain all the data.
sample_ratio_bak = model.config_obj.preprocessing.sample_ratio
sample_cap_bak = model.config_obj.preprocessing.sample_cap
model.config_obj.preprocessing.sample_ratio = 1.0
model.config_obj.preprocessing.sample_cap = None

config = model.config_obj.to_dict()
training_set_metadata = copy.deepcopy(model.training_set_metadata)
Expand All @@ -302,8 +304,9 @@ def get_input_tensors(
callbacks=model.callbacks,
)

# Restore sample_ratio
# Restore sample_ratio and sample_cap
model.config_obj.preprocessing.sample_ratio = sample_ratio_bak
model.config_obj.preprocessing.sample_cap = sample_cap_bak

# Make sure the number of rows in the preprocessed dataset matches the number of rows in the input data
assert (
Expand Down
16 changes: 16 additions & 0 deletions ludwig/schema/metadata/configs/preprocessing.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,22 @@ sample_ratio:
expected_impact: 2
suggested_values: Depends on data size
ui_display_name: Sample Ratio
sample_cap:
default_value_reasoning:
The default value is None because we do not want to shrink
the dataset by default, and we do not know the size of an arbitrary dataset.
By setting the default to None, we fall back on the sample_ratio to determine
the size of the dataset.
description_implications:
Decreases the amount of data you are inputting into
the model. Could be useful if you have more data than you need and you are
concerned with computational costs. More useful than sample_ratio if you
know the exact number of samples you want to train on instead of knowing the proportion.
example_value:
- 1000
expected_impact: 2
suggested_values: Depends on data size
ui_display_name: Sample Cap
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Sample Size.

column:
expected_impact: 3
ui_display_name: Split Column
Expand Down
8 changes: 8 additions & 0 deletions ludwig/schema/preprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,14 @@ class PreprocessingConfig(schema_utils.BaseMarshmallowConfig):
parameter_metadata=PREPROCESSING_METADATA["sample_ratio"],
)

sample_cap: float = schema_utils.NonNegativeInteger(
default=None,
allow_none=True,
description="The maximum number of samples from the dataset to use. Cannot be set if sample_ratio is set to be "
"< 1.0. If sample_ratio is set to 1.0, this will override the number of samples to used.",
parameter_metadata=PREPROCESSING_METADATA["sample_cap"],
)

oversample_minority: float = schema_utils.NonNegativeFloat(
default=None,
allow_none=True,
Expand Down
102 changes: 102 additions & 0 deletions tests/integration_tests/test_preprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,108 @@ def test_sample_ratio_deterministic(backend, tmpdir, ray_cluster_2cpu):
assert test_set_1.to_df().compute().equals(test_set_2.to_df().compute())


@pytest.mark.parametrize(
"backend",
[
pytest.param("local", id="local"),
pytest.param("ray", id="ray", marks=pytest.mark.distributed),
],
)
def test_sample_cap(backend, tmpdir, ray_cluster_2cpu):
num_examples = 100
sample_cap = 25

input_features = [sequence_feature(encoder={"reduce_output": "sum"}), audio_feature(folder=tmpdir)]
output_features = [category_feature(decoder={"vocab_size": 5}, reduce_input="sum")]
data_csv = generate_data(
input_features, output_features, os.path.join(tmpdir, "dataset.csv"), num_examples=num_examples
)
config = {
INPUT_FEATURES: input_features,
OUTPUT_FEATURES: output_features,
TRAINER: {
EPOCHS: 2,
},
PREPROCESSING: {"sample_cap": sample_cap},
}

model = LudwigModel(config, backend=backend)
train_set, val_set, test_set, training_set_metadata = model.preprocess(
data_csv,
skip_save_processed_input=True,
)

count = len(train_set) + len(val_set) + len(test_set)
assert sample_cap == count

# Check that sample cap is disabled when doing preprocessing for prediction
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: sample size

dataset, _ = preprocess_for_prediction(
model.config_obj.to_dict(),
dataset=data_csv,
training_set_metadata=training_set_metadata,
split=FULL,
include_outputs=True,
backend=model.backend,
)
assert "sample_cap" in model.config_obj.preprocessing.to_dict()
assert len(dataset) == num_examples


@pytest.mark.parametrize(
"backend",
[
pytest.param("local", id="local"),
pytest.param("ray", id="ray", marks=pytest.mark.distributed),
],
)
def test_sample_cap_deterministic(backend, tmpdir, ray_cluster_2cpu):
"""Ensures that the sampled dataset is the same when using a random seed.

model.preprocess returns a PandasPandasDataset object when using local backend, and returns a RayDataset object when
using the Ray backend.
"""
num_examples = 100
sample_cap = 30

input_features = [binary_feature()]
output_features = [category_feature()]
data_csv = generate_data(
input_features, output_features, os.path.join(tmpdir, "dataset.csv"), num_examples=num_examples
)

config = {
INPUT_FEATURES: input_features,
OUTPUT_FEATURES: output_features,
PREPROCESSING: {"sample_cap": sample_cap},
}

model1 = LudwigModel(config, backend=backend)
train_set_1, val_set_1, test_set_1, _ = model1.preprocess(
data_csv,
skip_save_processed_input=True,
)

model2 = LudwigModel(config, backend=backend)
train_set_2, val_set_2, test_set_2, _ = model2.preprocess(
data_csv,
skip_save_processed_input=True,
)

# Ensure sizes are the same
assert sample_cap == len(train_set_1) + len(val_set_1) + len(test_set_1)
assert sample_cap == len(train_set_2) + len(val_set_2) + len(test_set_2)

# Ensure actual rows are the same
if backend == "local":
assert train_set_1.to_df().equals(train_set_2.to_df())
assert val_set_1.to_df().equals(val_set_2.to_df())
assert test_set_1.to_df().equals(test_set_2.to_df())
else:
assert train_set_1.to_df().compute().equals(train_set_2.to_df().compute())
assert val_set_1.to_df().compute().equals(val_set_2.to_df().compute())
assert test_set_1.to_df().compute().equals(test_set_2.to_df().compute())


def test_strip_whitespace_category(csv_filename, tmpdir):
data_csv_path = os.path.join(tmpdir, csv_filename)

Expand Down
Loading