Skip to content

Commit

Permalink
Fix stop graph failed due to failure of serialization (#44)
Browse files Browse the repository at this point in the history
* fix stop graph failed due to failure of serialization
  • Loading branch information
qinxuye authored and wjsi committed Dec 18, 2018
1 parent dff897a commit 14172de
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 2 deletions.
25 changes: 24 additions & 1 deletion mars/deploy/local/tests/test_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import time
import unittest

import numpy as np

from mars import tensor as mt
from mars.operands import Operand
from mars.tensor.expressions.arithmetic.core import TensorElementWise
from mars.serialize import Int64Field
from mars.config import options
from mars.session import new_session, Session
from mars.deploy.local.core import new_cluster, LocalDistributedCluster, gen_endpoint
Expand All @@ -28,6 +30,19 @@
from mars.worker.dispatcher import DispatchActor


def _on_deserialize_fail(x):
raise TypeError('intend to throw error on' + str(x))


class SerializeMustFailOperand(Operand, TensorElementWise):
_op_type_ = 356789

_f = Int64Field('f', on_deserialize=_on_deserialize_fail)

def __init__(self, f=None, **kw):
super(SerializeMustFailOperand, self).__init__(_f=f, **kw)


class Test(unittest.TestCase):
def setUp(self):
self._old_cache_memory_limit = options.worker.cache_memory_limit
Expand Down Expand Up @@ -139,3 +154,11 @@ def testMultipleOutputTensorExecute(self):

np.testing.assert_allclose(U_result, U_expected)
np.testing.assert_allclose(s_result, s_expectd)

def testGraphFail(self):
op = SerializeMustFailOperand(f=3)
tensor = op.new_tensor(None, (3, 3))

with new_cluster(scheduler_n_process=2, worker_n_process=2) as cluster:
with self.assertRaises(SystemError):
cluster.session.run(tensor)
8 changes: 7 additions & 1 deletion mars/scheduler/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,13 @@ def stop_graph(self):
from .operand import OperandActor
self.state = GraphState.CANCELLING

for chunk in self.get_chunk_graph():
try:
chunk_graph = self.get_chunk_graph()
except KeyError:
self.state = GraphState.CANCELLED
return

for chunk in chunk_graph:
if chunk.op.key not in self._operand_infos:
continue
if self._operand_infos[chunk.op.key]['state'] in \
Expand Down

0 comments on commit 14172de

Please sign in to comment.