Skip to content

Commit

Permalink
Adds regression tests for #2020 (#2021)
Browse files Browse the repository at this point in the history
* fixes nans in dask df engine

* adds tests

* fixes with logs

* fixes

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* cleanup

* checking accuracy closeness

* investigates ray batcher dropping samples with logs

* clean up for PR review

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* cleanup

* add missing test param

* updated sampling to nan_percent% of rows in each col

* cleanup

Co-authored-by: Geoffrey Angus <geoffrey@predibase.com>
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
3 people committed May 11, 2022
1 parent 2c3fe2e commit 9ae57a9
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 24 deletions.
2 changes: 2 additions & 0 deletions ludwig/data/dataframe/dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ def df_like(self, df: dd.DataFrame, proc_cols: Dict[str, dd.Series]):
# we need to drop it immediately following creation.
dataset = df.index.to_frame(name=TMP_COLUMN).drop(columns=[TMP_COLUMN])
# TODO: address if following results in fragmented DataFrame
# TODO: see if we can get divisions. concat (instead of iterative join) should work if divs are known. Source:
# https://github.com/dask/dask/blob/5fbda77cfc5bc1b8f1453a2dbb034b048fc10726/dask/dataframe/multi.py#L1245
for col_name, col in proc_cols.items():
col.name = col_name
dataset = dataset.join(col, how="inner") # inner join handles Series with dropped rows
Expand Down
15 changes: 13 additions & 2 deletions ludwig/data/dataframe/pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,19 @@ def __init__(self, **kwargs):
super().__init__()

def df_like(self, df, proc_cols):
# df argument unused for pandas, which can instantiate df directly
return pd.DataFrame(proc_cols)
# Our goal is to preserve the index of the input dataframe but to drop
# all its columns. Because to_frame() creates a column from the index,
# we need to drop it immediately following creation.
col_names, cols = zip(*proc_cols.items())
series_cols = []
for col in cols:
if type(col) not in {pd.Series, pd.DataFrame}:
series_cols.append(pd.Series(col))
else:
series_cols.append(col)
dataset = pd.concat(series_cols, join="inner", axis=1) # inner join handles Series with dropped rows
dataset.columns = col_names
return dataset

def parallelize(self, data):
return data
Expand Down
9 changes: 2 additions & 7 deletions tests/integration_tests/test_missing_value_strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
generate_data,
LocalTestBackend,
number_feature,
read_csv_with_nan,
sequence_feature,
set_feature,
text_feature,
Expand Down Expand Up @@ -86,13 +87,7 @@ def test_missing_values_drop_rows(csv_filename, tmpdir):
config = {"input_features": input_features, "output_features": output_features, TRAINER: {"epochs": 2}}

training_data_csv_path = generate_data(input_features, output_features, data_csv_path)
df = pd.read_csv(training_data_csv_path)

# set 10% of values to NaN
nan_percent = 0.1
ix = [(row, col) for row in range(df.shape[0]) for col in range(df.shape[1])]
for row, col in random.sample(ix, int(round(nan_percent * len(ix)))):
df.iat[row, col] = np.nan
df = read_csv_with_nan(training_data_csv_path, nan_percent=0.1)

# run preprocessing
ludwig_model = LudwigModel(config, backend=backend)
Expand Down
6 changes: 4 additions & 2 deletions tests/integration_tests/test_ray.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ def run_test_with_features(
df_engine=None,
dataset_type="parquet",
skip_save_processed_input=True,
nan_percent=0.0,
):
with ray_start(num_cpus=num_cpus, num_gpus=num_gpus):
config = {
Expand All @@ -172,7 +173,7 @@ def run_test_with_features(
with tempfile.TemporaryDirectory() as tmpdir:
csv_filename = os.path.join(tmpdir, "dataset.csv")
dataset_csv = generate_data(input_features, output_features, csv_filename, num_examples=num_examples)
dataset = create_data_set_to_use(dataset_type, dataset_csv)
dataset = create_data_set_to_use(dataset_type, dataset_csv, nan_percent=nan_percent)

if expect_error:
with pytest.raises(ValueError):
Expand All @@ -191,7 +192,7 @@ def run_test_with_features(
)


@pytest.mark.parametrize("dataset_type", ["parquet", "csv"])
@pytest.mark.parametrize("dataset_type", ["csv", "parquet"])
@pytest.mark.distributed
def test_ray_save_processed_input(dataset_type):
input_features = [
Expand All @@ -206,6 +207,7 @@ def test_ray_save_processed_input(dataset_type):
df_engine="dask",
dataset_type=dataset_type,
skip_save_processed_input=False,
nan_percent=0.1,
)


Expand Down
39 changes: 26 additions & 13 deletions tests/integration_tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -520,7 +520,18 @@ def run_api_experiment(input_features, output_features, data_csv):
shutil.rmtree(output_dir, ignore_errors=True)


def create_data_set_to_use(data_format, raw_data):
def read_csv_with_nan(path, nan_percent=0.0):
"""Converts `nan_percent` of samples in each row of the CSV at `path` to NaNs."""
df = pd.read_csv(path)
if nan_percent > 0:
num_rows = len(df)
for col in df.columns:
for row in random.sample(range(num_rows), int(round(nan_percent * num_rows))):
df[col].iloc[row] = np.nan
return df


def create_data_set_to_use(data_format, raw_data, nan_percent=0.0):
# helper function for generating training and test data with specified format
# handles all data formats except for hdf5
# assumes raw_data is a csv dataset generated by
Expand All @@ -539,56 +550,58 @@ def to_fwf(df, fname):
dataset_to_use = None

if data_format == "csv":
# Replace the original CSV with a CSV with NaNs
dataset_to_use = raw_data
read_csv_with_nan(raw_data, nan_percent=nan_percent).to_csv(dataset_to_use, index=False)

elif data_format in {"df", "dict"}:
dataset_to_use = pd.read_csv(raw_data)
dataset_to_use = read_csv_with_nan(raw_data, nan_percent=nan_percent)
if data_format == "dict":
dataset_to_use = dataset_to_use.to_dict(orient="list")

elif data_format == "excel":
dataset_to_use = replace_file_extension(raw_data, "xlsx")
pd.read_csv(raw_data).to_excel(dataset_to_use, index=False)
read_csv_with_nan(raw_data, nan_percent=nan_percent).to_excel(dataset_to_use, index=False)

elif data_format == "excel_xls":
dataset_to_use = replace_file_extension(raw_data, "xls")
pd.read_csv(raw_data).to_excel(dataset_to_use, index=False)
read_csv_with_nan(raw_data, nan_percent=nan_percent).to_excel(dataset_to_use, index=False)

elif data_format == "feather":
dataset_to_use = replace_file_extension(raw_data, "feather")
pd.read_csv(raw_data).to_feather(dataset_to_use)
read_csv_with_nan(raw_data, nan_percent=nan_percent).to_feather(dataset_to_use)

elif data_format == "fwf":
dataset_to_use = replace_file_extension(raw_data, "fwf")
pd.read_csv(raw_data).to_fwf(dataset_to_use)
read_csv_with_nan(raw_data, nan_percent=nan_percent).to_fwf(dataset_to_use)

elif data_format == "html":
dataset_to_use = replace_file_extension(raw_data, "html")
pd.read_csv(raw_data).to_html(dataset_to_use, index=False)
read_csv_with_nan(raw_data, nan_percent=nan_percent).to_html(dataset_to_use, index=False)

elif data_format == "json":
dataset_to_use = replace_file_extension(raw_data, "json")
pd.read_csv(raw_data).to_json(dataset_to_use, orient="records")
read_csv_with_nan(raw_data, nan_percent=nan_percent).to_json(dataset_to_use, orient="records")

elif data_format == "jsonl":
dataset_to_use = replace_file_extension(raw_data, "jsonl")
pd.read_csv(raw_data).to_json(dataset_to_use, orient="records", lines=True)
read_csv_with_nan(raw_data, nan_percent=nan_percent).to_json(dataset_to_use, orient="records", lines=True)

elif data_format == "parquet":
dataset_to_use = replace_file_extension(raw_data, "parquet")
pd.read_csv(raw_data).to_parquet(dataset_to_use, index=False)
read_csv_with_nan(raw_data, nan_percent=nan_percent).to_parquet(dataset_to_use, index=False)

elif data_format == "pickle":
dataset_to_use = replace_file_extension(raw_data, "pickle")
pd.read_csv(raw_data).to_pickle(dataset_to_use)
read_csv_with_nan(raw_data, nan_percent=nan_percent).to_pickle(dataset_to_use)

elif data_format == "stata":
dataset_to_use = replace_file_extension(raw_data, "stata")
pd.read_csv(raw_data).to_stata(dataset_to_use)
read_csv_with_nan(raw_data, nan_percent=nan_percent).to_stata(dataset_to_use)

elif data_format == "tsv":
dataset_to_use = replace_file_extension(raw_data, "tsv")
pd.read_csv(raw_data).to_csv(dataset_to_use, sep="\t", index=False)
read_csv_with_nan(raw_data, nan_percent=nan_percent).to_csv(dataset_to_use, sep="\t", index=False)

else:
ValueError(f"'{data_format}' is an unrecognized data format")
Expand Down

0 comments on commit 9ae57a9

Please sign in to comment.