Skip to content

Commit

Permalink
[Ray] Basic slow subtask detection (mars-project#3305)
Browse files Browse the repository at this point in the history
* Basic slow subtask detection

* Replace sklearn.utils._testing.assert_warns with pytest.warns

* Fix load_boston

* Fix rerun_time type

* Remove rerun_time from Task and Subtask

* Fix

* Fix

* Add UT

* Fix lint

* Fix lint

* Refine logs

* Fix

* Refine code

* Add check_slow_subtasks_iqr_ratio config

Co-authored-by: 刘宝 <po.lb@antgroup.com>
  • Loading branch information
fyrestone and 刘宝 committed Dec 23, 2022
1 parent 16843aa commit bde43bb
Show file tree
Hide file tree
Showing 11 changed files with 274 additions and 53 deletions.
6 changes: 3 additions & 3 deletions mars/contrib/dask/tests/test_dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,15 +75,15 @@ def test_unpartitioned_dataframe(setup_cluster):
from dask import dataframe as dd
from pandas._testing import assert_frame_equal
import pandas as pd
from sklearn.datasets import load_boston
from sklearn.datasets import load_iris

boston = load_boston()
boston = load_iris()
pd.DataFrame(boston.data, columns=boston["feature_names"]).to_csv(
"./boston_housing_data.csv"
)

df = dd.read_csv(r"./boston_housing_data.csv")
df["CRIM"] = df["CRIM"] / 2
df["sepal length (cm)"] = df["sepal length (cm)"] / 2

dask_res = df.compute()
assert_frame_equal(dask_res, df.compute(scheduler=mars_scheduler))
Expand Down
17 changes: 8 additions & 9 deletions mars/learn/cluster/tests/test_k_means.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
try:
from sklearn.datasets import make_blobs
from sklearn.metrics.cluster import v_measure_score
from sklearn.utils._testing import assert_raise_message, assert_warns
from sklearn.utils._testing import assert_raise_message
except ImportError:
pass

Expand Down Expand Up @@ -485,14 +485,13 @@ def test_k_means_function(setup):
assert inertia > 0.0

# check warning when centers are passed
assert_warns(
RuntimeWarning,
k_means,
X,
n_clusters=n_clusters,
sample_weight=None,
init=centers,
)
with pytest.warns(RuntimeWarning):
k_means(
X,
n_clusters=n_clusters,
sample_weight=None,
init=centers,
)

# to many clusters desired
with pytest.raises(ValueError):
Expand Down
5 changes: 3 additions & 2 deletions mars/learn/metrics/pairwise/tests/test_pariwise_distances.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from sklearn.metrics import pairwise_distances as sk_pairwise_distances
from sklearn.neighbors import NearestNeighbors as SkNearestNeighbors
from sklearn.exceptions import DataConversionWarning
from sklearn.utils._testing import assert_warns


from ..... import tensor as mt
from .....session import execute, fetch
Expand Down Expand Up @@ -63,7 +63,8 @@ def test_pairwise_distances_execution(setup):
expected = sk_pairwise_distances(raw_x, raw_y, metric=m)
np.testing.assert_almost_equal(result, expected)

assert_warns(DataConversionWarning, pairwise_distances, x, y, metric="jaccard")
with pytest.warns(DataConversionWarning):
pairwise_distances(x, y, metric="jaccard")

with pytest.raises(ValueError):
_ = pairwise_distances(x, y, metric="unknown")
Expand Down
7 changes: 4 additions & 3 deletions mars/learn/metrics/tests/test_ranking.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
from sklearn.exceptions import UndefinedMetricWarning
from sklearn.utils import check_random_state
from sklearn.utils._testing import (
assert_warns,
assert_almost_equal,
assert_array_almost_equal,
)
Expand Down Expand Up @@ -166,14 +165,16 @@ def test_roc_curve_one_label(setup):
y_pred = [0, 1, 0, 1, 0, 1, 0, 1, 0, 1]
# assert there are warnings
w = UndefinedMetricWarning
fpr, tpr, thresholds = assert_warns(w, roc_curve, y_true, y_pred)
with pytest.warns(w):
fpr, tpr, thresholds = roc_curve(y_true, y_pred)
# all true labels, all fpr should be nan
np.testing.assert_array_equal(fpr.fetch(), np.full(len(thresholds), np.nan))
assert fpr.shape == tpr.shape
assert fpr.shape == thresholds.shape

# assert there are warnings
fpr, tpr, thresholds = assert_warns(w, roc_curve, [1 - x for x in y_true], y_pred)
with pytest.warns(w):
fpr, tpr, thresholds = roc_curve([1 - x for x in y_true], y_pred)
# all negative labels, all tpr should be nan
np.testing.assert_array_equal(tpr.fetch(), np.full(len(thresholds), np.nan))
assert fpr.shape == tpr.shape
Expand Down
7 changes: 4 additions & 3 deletions mars/learn/neighbors/tests/test_nearest_neighbors.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
from sklearn.neighbors import NearestNeighbors as SkNearestNeighbors
from sklearn.neighbors import BallTree as SkBallTree
from sklearn.neighbors import KDTree as SkKDTree
from sklearn.utils._testing import assert_warns
except ImportError: # pragma: no cover
SkNearestNeighbors = None

Expand Down Expand Up @@ -66,7 +65,8 @@ def test_nearest_neighbors(setup):
with pytest.raises(ValueError):
_ = NearestNeighbors(algorithm="auto", metric="unknown")

assert_warns(SyntaxWarning, NearestNeighbors, metric_params={"p": 1})
with pytest.warns(SyntaxWarning):
NearestNeighbors(metric_params={"p": 1})

with pytest.raises(ValueError):
_ = NearestNeighbors(metric="wminkowski", p=0)
Expand Down Expand Up @@ -105,7 +105,8 @@ def test_nearest_neighbors(setup):
nn.fit(np.random.rand(0, 10))

nn = NearestNeighbors(algorithm="ball_tree")
assert_warns(UserWarning, nn.fit, X_sparse)
with pytest.warns(UserWarning):
nn.fit(X_sparse)

nn = NearestNeighbors(metric="haversine")
with pytest.raises(ValueError):
Expand Down
5 changes: 3 additions & 2 deletions mars/learn/semi_supervised/tests/test_label_propagation.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
from sklearn.exceptions import ConvergenceWarning
from sklearn.utils._testing import assert_no_warnings, assert_warns
from sklearn.utils._testing import assert_no_warnings

from .... import tensor as mt
from ...metrics.pairwise import rbf_kernel
Expand Down Expand Up @@ -103,7 +103,8 @@ def test_convergence_warning(setup):
y = np.array([0, 1, -1])

mdl = LabelPropagation(kernel="rbf", max_iter=1)
assert_warns(ConvergenceWarning, mdl.fit, X, y)
with pytest.warns(ConvergenceWarning):
mdl.fit(X, y)
assert mdl.n_iter_ == mdl.max_iter

mdl = LabelPropagation(kernel="rbf", max_iter=500)
Expand Down
7 changes: 3 additions & 4 deletions mars/services/subtask/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def is_done(self) -> bool:


class Subtask(Serializable):
__slots__ = ("_repr", "_pure_depend_keys")
__slots__ = ("_repr", "_pure_depend_keys", "runtime")

subtask_id: str = StringField("subtask_id")
subtask_name: str = StringField("subtask_name")
Expand All @@ -65,7 +65,6 @@ class Subtask(Serializable):
virtual: bool = BoolField("virtual")
retryable: bool = BoolField("retryable")
priority: Tuple[int, int] = TupleField("priority", FieldTypes.int32)
rerun_time: int = Int32Field("rerun_time")
extra_config: dict = DictField("extra_config")
stage_id: str = StringField("stage_id")
# chunks that need meta updated
Expand Down Expand Up @@ -95,7 +94,6 @@ def __init__(
priority: Tuple[int, int] = None,
virtual: bool = False,
retryable: bool = True,
rerun_time: int = 0,
extra_config: dict = None,
stage_id: str = None,
update_meta_chunks: List[ChunkType] = None,
Expand All @@ -116,7 +114,6 @@ def __init__(
priority=priority,
virtual=virtual,
retryable=retryable,
rerun_time=rerun_time,
extra_config=extra_config,
stage_id=stage_id,
update_meta_chunks=update_meta_chunks,
Expand All @@ -129,11 +126,13 @@ def __init__(
)
self._pure_depend_keys = None
self._repr = None
self.runtime = None

def __on_deserialize__(self):
super(Subtask, self).__on_deserialize__()
self._pure_depend_keys = None
self._repr = None
self.runtime = None

@property
def expect_band(self):
Expand Down
4 changes: 0 additions & 4 deletions mars/services/task/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
Serializable,
StringField,
ReferenceField,
Int32Field,
BoolField,
AnyField,
DictField,
Expand All @@ -41,7 +40,6 @@ class Task(Serializable):
session_id: str = StringField("session_id")
tileable_graph: TileableGraph = ReferenceField("tileable_graph", TileableGraph)
fuse_enabled: bool = BoolField("fuse_enabled")
rerun_time: int = Int32Field("rerun_time")
extra_config: dict = DictField("extra_config")

def __init__(
Expand All @@ -50,15 +48,13 @@ def __init__(
session_id: str = None,
tileable_graph: TileableGraph = None,
fuse_enabled: bool = True,
rerun_time: int = 0,
extra_config: dict = None,
):
super().__init__(
task_id=task_id,
session_id=session_id,
tileable_graph=tileable_graph,
fuse_enabled=fuse_enabled,
rerun_time=rerun_time,
extra_config=extra_config,
)

Expand Down
14 changes: 14 additions & 0 deletions mars/services/task/execution/ray/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
# The default interval seconds to update progress and collect garbage.
DEFAULT_MONITOR_INTERVAL_SECONDS = 0 if IN_RAY_CI else 1
DEFAULT_LOG_INTERVAL_SECONDS = 60
DEFAULT_CHECK_SLOW_SUBTASKS_INTERVAL_SECONDS = 120


@register_config_cls
Expand Down Expand Up @@ -77,6 +78,19 @@ def get_log_interval_seconds(self):
"log_interval_seconds", DEFAULT_LOG_INTERVAL_SECONDS
)

def get_check_slow_subtasks_interval_seconds(self) -> float:
return self._ray_execution_config.get(
"check_slow_subtasks_interval_seconds",
DEFAULT_CHECK_SLOW_SUBTASKS_INTERVAL_SECONDS,
)

def get_check_slow_subtask_iqr_ratio(self) -> float:
# https://en.wikipedia.org/wiki/Box_plot
# iqr = q3 - q1
# duration_threshold = q3 + check_slow_subtasks_iqr_ratio * (q3 - q1)
# So, the value == 3, extremely slow(probably hang); value == 1.5, slow
return self._ray_execution_config.get("check_slow_subtasks_iqr_ratio", 3)

def get_shuffle_fetch_type(self) -> ShuffleFetchType:
return ShuffleFetchType.FETCH_BY_INDEX

Expand Down
Loading

0 comments on commit bde43bb

Please sign in to comment.