Skip to content

Commit

Permalink
Refine failure recovery log and exception (#2633)
Browse files Browse the repository at this point in the history
* Refine fo log and exception

* Pin xgboost_ray to 0.1.5

Co-authored-by: 留宝 <po.lb@antgroup.com>
Co-authored-by: 刘宝 <po.lb@antfin.com>
  • Loading branch information
3 people committed Feb 9, 2022
1 parent bacb2b5 commit 61c8eac
Show file tree
Hide file tree
Showing 5 changed files with 109 additions and 25 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/platform-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ jobs:
fi
if [ -n "$WITH_RAY" ]; then
pip install pip install ray[default]
pip install xgboost_ray
pip install xgboost_ray==0.1.5
pip install --upgrade numpy
fi
if [ -n "$RUN_DASK" ]; then
Expand Down
43 changes: 36 additions & 7 deletions mars/deploy/oscar/tests/test_fault_injection.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import os
import pytest
import traceback
import numpy as np
import pandas as pd

Expand Down Expand Up @@ -239,22 +240,43 @@ async def test_rerun_subtask_describe(fault_cluster, fault_config):
@pytest.mark.parametrize(
"fault_cluster", [{"config": RERUN_SUBTASK_CONFIG_FILE}], indirect=True
)
@pytest.mark.parametrize(
"fault_config",
[
[
FaultType.UnhandledException,
{FaultPosition.ON_EXECUTE_OPERAND: 1},
pytest.raises(FaultInjectionUnhandledError),
["_UnhandledException", "handle_fault"],
],
[
FaultType.Exception,
{FaultPosition.ON_EXECUTE_OPERAND: 100},
pytest.raises(FaultInjectionError),
["_ExceedMaxRerun", "handle_fault"],
],
],
)
@pytest.mark.asyncio
async def test_rerun_subtask_unhandled(fault_cluster):
async def test_rerun_subtask_fail(fault_cluster, fault_config):
fault_type, fault_count, expect_raises, exception_match = fault_config
name = await create_fault_injection_manager(
session_id=fault_cluster.session.session_id,
address=fault_cluster.session.address,
fault_count={FaultPosition.ON_EXECUTE_OPERAND: 1},
fault_type=FaultType.UnhandledException,
fault_count=fault_count,
fault_type=fault_type,
)
exception_typename, stack_string = exception_match
extra_config = {ExtraConfigKey.FAULT_INJECTION_MANAGER_NAME: name}

raw = np.random.RandomState(0).rand(10, 10)
a = mt.tensor(raw, chunk_size=5)
b = a + 1

with pytest.raises(FaultInjectionUnhandledError):
with expect_raises as e:
b.execute(extra_config=extra_config)
assert e.typename == exception_typename, "".join(traceback.format_tb(e.tb))
assert e.traceback[-1].name == stack_string, "".join(traceback.format_tb(e.tb))


@pytest.mark.parametrize(
Expand All @@ -266,29 +288,36 @@ async def test_rerun_subtask_unhandled(fault_cluster):
[
FaultType.Exception,
{FaultPosition.ON_EXECUTE_OPERAND: 1},
pytest.raises(FaultInjectionError, match="Fault Injection"),
pytest.raises(FaultInjectionError, match="RemoteFunction"),
["_UnretryableException", "handle_fault"],
],
[
FaultType.ProcessExit,
{FaultPosition.ON_EXECUTE_OPERAND: 1},
pytest.raises(ServerClosed),
["_UnretryableException", "*"],
],
],
)
@pytest.mark.asyncio
async def test_retryable(fault_cluster, fault_config):
fault_type, fault_count, expect_raises = fault_config
fault_type, fault_count, expect_raises, exception_match = fault_config
name = await create_fault_injection_manager(
session_id=fault_cluster.session.session_id,
address=fault_cluster.session.address,
fault_count=fault_count,
fault_type=fault_type,
)
exception_typename, stack_string = exception_match
extra_config = {ExtraConfigKey.FAULT_INJECTION_MANAGER_NAME: name}

def f(x):
return x + 1

r = spawn(f, args=(1,), retry_when_fail=False)
with expect_raises:
with expect_raises as e:
r.execute(extra_config=extra_config)
assert e.typename == exception_typename, "".join(traceback.format_tb(e.tb))
assert stack_string == "*" or e.traceback[-1].name == stack_string, "".join(
traceback.format_tb(e.tb)
)
25 changes: 22 additions & 3 deletions mars/deploy/oscar/tests/test_ray_fault_injection.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,26 @@ async def test_rerun_subtask_describe(ray_start_regular, fault_cluster, fault_co
@pytest.mark.parametrize(
"fault_cluster", [{"config": SUBTASK_RERUN_CONFIG}], indirect=True
)
@pytest.mark.parametrize(
"fault_config",
[
[
FaultType.UnhandledException,
{FaultPosition.ON_EXECUTE_OPERAND: 1},
pytest.raises(FaultInjectionUnhandledError),
["_UnhandledException", "handle_fault"],
],
[
FaultType.Exception,
{FaultPosition.ON_EXECUTE_OPERAND: 100},
pytest.raises(FaultInjectionError),
["_ExceedMaxRerun", "handle_fault"],
],
],
)
@pytest.mark.asyncio
async def test_rerun_subtask_unhandled(ray_start_regular, fault_cluster):
await test_fault_injection.test_rerun_subtask_unhandled(fault_cluster)
async def test_rerun_subtask_fail(ray_start_regular, fault_cluster, fault_config):
await test_fault_injection.test_rerun_subtask_fail(fault_cluster, fault_config)


@require_ray
Expand All @@ -167,12 +184,14 @@ async def test_rerun_subtask_unhandled(ray_start_regular, fault_cluster):
[
FaultType.Exception,
{FaultPosition.ON_EXECUTE_OPERAND: 1},
pytest.raises(FaultInjectionError, match="Fault Injection"),
pytest.raises(FaultInjectionError, match="RemoteFunction"),
["_UnretryableException", "handle_fault"],
],
[
FaultType.ProcessExit,
{FaultPosition.ON_EXECUTE_OPERAND: 1},
pytest.raises(ServerClosed),
["_UnretryableException", "*"],
],
],
)
Expand Down
62 changes: 49 additions & 13 deletions mars/services/scheduling/worker/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import functools
import logging
import operator
import pprint
import sys
from collections import defaultdict
from dataclasses import dataclass, field
Expand Down Expand Up @@ -67,28 +68,45 @@ async def _retry_run(
except (OSError, MarsError) as ex:
if subtask_info.num_retries < subtask_info.max_retries:
logger.error(
"Rerun the %s of subtask %s due to %s",
"Rerun[%s/%s] the %s of subtask %s due to %s.",
subtask_info.num_retries,
subtask_info.max_retries,
target_async_func,
subtask.subtask_id,
ex,
)
subtask_info.num_retries += 1
continue
raise ex
if subtask_info.max_retries > 0:
message = (
f"Exceed max rerun[{subtask_info.num_retries}/{subtask_info.max_retries}]:"
f" {target_async_func} of subtask {subtask.subtask_id} due to {ex}."
)
logger.error(message)

class _ExceedMaxRerun(type(ex)):
pass

raise _ExceedMaxRerun(message).with_traceback(ex.__traceback__)
else:
raise ex
except asyncio.CancelledError:
raise
except Exception as ex:
if subtask_info.num_retries < subtask_info.max_retries:
logger.error(
"Failed to rerun the %s of subtask %s, "
"num_retries: %s, max_retries: %s, unhandled exception: %s",
target_async_func,
subtask.subtask_id,
subtask_info.num_retries,
subtask_info.max_retries,
ex,
if subtask_info.max_retries > 0:
message = (
f"Failed to rerun the {target_async_func} of subtask {subtask.subtask_id}, "
f"num_retries: {subtask_info.num_retries}, max_retries: {subtask_info.max_retries} "
f"due to unhandled exception: {ex}."
)
raise ex
logger.error(message)

class _UnhandledException(type(ex)):
pass

raise _UnhandledException(message).with_traceback(ex.__traceback__)
else:
raise ex


def _fill_subtask_result_with_exception(
Expand Down Expand Up @@ -403,7 +421,25 @@ async def _run_subtask_once():
if subtask.retryable:
return await _retry_run(subtask, subtask_info, _run_subtask_once)
else:
return await _run_subtask_once()
try:
return await _run_subtask_once()
except Exception as e:
unretryable_op = [
chunk.op
for chunk in subtask.chunk_graph
if not getattr(chunk.op, "retryable", True)
]
message = (
f"Run subtask failed due to {e}, the subtask {subtask.subtask_id} is "
f"not retryable, it contains unretryable op: \n"
f"{pprint.pformat(unretryable_op)}"
)
logger.error(message)

class _UnretryableException(type(e)):
pass

raise _UnretryableException(message).with_traceback(e.__traceback__)

async def run_subtask(
self, subtask: Subtask, band_name: str, supervisor_address: str
Expand Down
2 changes: 1 addition & 1 deletion mars/services/task/tests/test_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ def f1():

await task_api.wait_task(task_id, timeout=10)
results = await task_api.get_task_results(progress=True)
assert type(results[0].error) is SystemError
assert isinstance(results[0].error, SystemError)


@pytest.mark.asyncio
Expand Down

0 comments on commit 61c8eac

Please sign in to comment.