Skip to content

Commit

Permalink
Ensure no ghost ray instances are running in tests (#2607)
Browse files Browse the repository at this point in the history
* Add test fixtures to prevent double initialization

* contain dataframe utils test in ray test fixture

* add fixture for training determinism test

* fix typo in docstring
  • Loading branch information
arnavgarg1 committed Oct 14, 2022
1 parent e200e98 commit 35a6dc7
Show file tree
Hide file tree
Showing 6 changed files with 20 additions and 39 deletions.
6 changes: 4 additions & 2 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ def yaml_filename():


@pytest.fixture(scope="module")
def hyperopt_results_single_parameter():
def hyperopt_results_single_parameter(ray_cluster_4cpu):
"""This fixture is used by hyperopt visualization tests in test_visualization_api.py."""
config, rel_path = _get_sample_config()
config[HYPEROPT] = {
"parameters": {
Expand All @@ -97,7 +98,8 @@ def hyperopt_results_single_parameter():


@pytest.fixture(scope="module")
def hyperopt_results_multiple_parameters():
def hyperopt_results_multiple_parameters(ray_cluster_4cpu):
"""This fixture is used by hyperopt visualization tests in test_visualization_api.py."""
config, rel_path = _get_sample_config()
output_feature_name = config[OUTPUT_FEATURES][0][NAME]
config[HYPEROPT] = {
Expand Down
35 changes: 7 additions & 28 deletions tests/integration_tests/test_whylogs.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import contextlib
import os
import shutil

Expand All @@ -7,25 +6,7 @@
from ludwig.api import LudwigModel
from ludwig.constants import TRAINER
from ludwig.contribs import WhyLogsCallback
from tests.integration_tests.utils import category_feature, generate_data, sequence_feature, spawn

try:
import ray
except ImportError:
ray = None


@contextlib.contextmanager
def ray_start(num_cpus=2):
res = ray.init(
num_cpus=num_cpus,
include_dashboard=False,
object_store_memory=150 * 1024 * 1024,
)
try:
yield res
finally:
ray.shutdown()
from tests.integration_tests.utils import category_feature, generate_data, sequence_feature


def test_whylogs_callback_local(tmpdir):
Expand Down Expand Up @@ -64,7 +45,7 @@ def test_whylogs_callback_local(tmpdir):


@pytest.mark.distributed
def test_whylogs_callback_dask(tmpdir):
def test_whylogs_callback_dask(tmpdir, ray_cluster_4cpu):
num_examples = 100

input_features = [sequence_feature(encoder={"reduce_output": "sum"})]
Expand All @@ -84,7 +65,6 @@ def test_whylogs_callback_dask(tmpdir):
assert os.path.isdir(local_prediction_output_dir) is True


@spawn
def run_dask(input_features, output_features, data_csv, val_csv, test_csv):
epochs = 2
batch_size = 8
Expand All @@ -109,9 +89,8 @@ def run_dask(input_features, output_features, data_csv, val_csv, test_csv):
TRAINER: {"epochs": epochs, "batch_size": batch_size},
}

with ray_start(num_cpus=4):
exp_name = "whylogs_test_ray"
callback = WhyLogsCallback()
model = LudwigModel(config, backend=backend, callbacks=[callback])
model.train(training_set=data_csv, validation_set=val_csv, test_set=test_csv, experiment_name=exp_name)
_, _ = model.predict(test_csv)
exp_name = "whylogs_test_ray"
callback = WhyLogsCallback()
model = LudwigModel(config, backend=backend, callbacks=[callback])
model.train(training_set=data_csv, validation_set=val_csv, test_set=test_csv, experiment_name=exp_name)
_, _ = model.predict(test_csv)
2 changes: 1 addition & 1 deletion tests/ludwig/automl/test_data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@


@pytest.mark.distributed
def test_mixed_csv_data_source():
def test_mixed_csv_data_source(ray_cluster_2cpu):
try:
temp = tempfile.NamedTemporaryFile(mode="w+")
temp.write(CSV_CONTENT)
Expand Down
10 changes: 5 additions & 5 deletions tests/ludwig/data/test_split.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
pytest.param(DaskEngine(_use_ray=False), id="dask", marks=pytest.mark.distributed),
],
)
def test_random_split(df_engine):
def test_random_split(df_engine, ray_cluster_2cpu):
nrows = 100
npartitions = 10

Expand Down Expand Up @@ -70,7 +70,7 @@ def compute(dfs):
pytest.param(DaskEngine(_use_ray=False), id="dask", marks=pytest.mark.distributed),
],
)
def test_random_split_zero_probability_for_test_produces_no_zombie(df_engine):
def test_random_split_zero_probability_for_test_produces_no_zombie(df_engine, ray_cluster_2cpu):
nrows = 102
npartitions = 10

Expand Down Expand Up @@ -99,7 +99,7 @@ def test_random_split_zero_probability_for_test_produces_no_zombie(df_engine):
pytest.param(DaskEngine(_use_ray=False), id="dask", marks=pytest.mark.distributed),
],
)
def test_fixed_split(df_engine):
def test_fixed_split(df_engine, ray_cluster_2cpu):
nrows = 100
npartitions = 10
thresholds = [60, 80, 100]
Expand Down Expand Up @@ -155,7 +155,7 @@ def get_split(v):
pytest.param(np.array([0.6, 0.2, 0.2]), id="imbalanced"),
],
)
def test_stratify_split(df_engine, nrows, atol, class_probs):
def test_stratify_split(df_engine, nrows, atol, class_probs, ray_cluster_2cpu):
npartitions = 10
thresholds = np.cumsum((class_probs * nrows).astype(int))

Expand Down Expand Up @@ -218,7 +218,7 @@ def get_category(v):
pytest.param(DaskEngine(_use_ray=False), id="dask", marks=pytest.mark.distributed),
],
)
def test_datetime_split(df_engine):
def test_datetime_split(df_engine, ray_cluster_2cpu):
nrows = 100
npartitions = 10

Expand Down
2 changes: 1 addition & 1 deletion tests/ludwig/models/test_training_determinism.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@


@pytest.mark.distributed
def test_training_determinism_ray_backend(csv_filename, tmpdir):
def test_training_determinism_ray_backend(csv_filename, tmpdir, ray_cluster_4cpu):
experiment_output_1, experiment_output_2 = train_twice("ray", csv_filename, tmpdir)

eval_stats_1, train_stats_1, _, _ = experiment_output_1
Expand Down
4 changes: 2 additions & 2 deletions tests/ludwig/utils/test_dataframe_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@


@pytest.mark.distributed
def test_to_numpy_dataset_with_dask():
def test_to_numpy_dataset_with_dask(ray_cluster_2cpu):
dd_df = dd.from_pandas(pd.DataFrame([[1, 2, 3]], columns=["col1", "col2", "col3"]), npartitions=1)
ray_backend = create_backend("ray")

Expand Down Expand Up @@ -54,7 +54,7 @@ def test_to_numpy_dataset_empty():


@pytest.mark.distributed
def test_to_numpy_dataset_with_pandas_backend_mismatch():
def test_to_numpy_dataset_with_pandas_backend_mismatch(ray_cluster_2cpu):
pd_df = pd.DataFrame([[1, 2, 3]], columns=["col1", "col2", "col3"])
ray_backend = create_backend("ray")

Expand Down

0 comments on commit 35a6dc7

Please sign in to comment.