Skip to content

Commit

Permalink
Finish traverse.
Browse files Browse the repository at this point in the history
  • Loading branch information
severb committed Jul 6, 2015
1 parent 8d9dda7 commit e67ebc0
Show file tree
Hide file tree
Showing 7 changed files with 104 additions and 89 deletions.
42 changes: 34 additions & 8 deletions flowy/config.py
Expand Up @@ -6,10 +6,12 @@

from flowy.result import is_result_proxy
from flowy.result import restart_type
from flowy.result import SuspendTask
from flowy.result import TaskError
from flowy.result import wait
from flowy.serialization import dumps
from flowy.serialization import traverse_dumps
from flowy.serialization import loads
from flowy.serialization import traverse_data
from flowy.utils import logger


Expand Down Expand Up @@ -146,12 +148,12 @@ def __init__(self, deserialize_input=None, serialize_result=None,

def serialize_restart_input(self, *args, **kwargs):
"""Try to serialize the result, returns any errors or placeholders."""
return traverse_dumps([args, kwargs])
return dumps([args, kwargs])

@staticmethod
def serialize_result(result):
"""Try to serialize the result, returns any errors or placeholders."""
return traverse_dumps(result)
return dumps(result)

def _check_dep(self, dep_name):
"""Check if dep_name is a unique valid identifier name."""
Expand Down Expand Up @@ -216,12 +218,36 @@ def _workflow_wrapper(self, factory, input_data, *extra_args):
# fixes another issue, on python2 isinstance() swallows any
# exception while python3 it doesn't.
if not is_result_proxy(result) and isinstance(result, restart_type):
r_input_data = self.serialize_restart_input(*result.args, **result.kwargs)
raise Restart(r_input_data)
try:
traversed_input, error, placeholders = traverse_data(
[result.args, result.kwargs])
except Exception:
logger.exception('Cannot traverse the restart arguments:')
raise ValueError(
'Cannot traverse the restart arguments: %r, %r' %
result.args, result.kwargs)
wait(error) # raise if not None
if placeholders:
raise SuspendTask
r_args, r_kwargs = traversed_input
try:
serialized_input = self.serialize_restart_input(*r_args, **r_kwargs)
except Exception:
logger.exception('Cannot serialize the restart arguments:')
raise ValueError(
'Cannot serialize the restart arguments: %r, %r' %
result.args, result.kwargs)
raise Restart(serialized_input)
try:
return self.serialize_result(result)
except TaskError:
raise # let task errors go trough
traversed_result, error, placeholders = traverse_data(result)
except Exception:
logger.exception('Cannot traverse the result:')
raise ValueError('Cannot traverse the result: %r' % result)
wait(error)
if placeholders:
raise SuspendTask
try:
return self.serialize_result(traversed_result)
except Exception:
logger.exception('Cannot serialize the result:')
raise ValueError('Cannot serialize the result: %r' % (result,))
Expand Down
14 changes: 8 additions & 6 deletions flowy/proxy.py
Expand Up @@ -8,8 +8,9 @@
from flowy.result import SuspendTask
from flowy.result import timeout
from flowy.result import wait
from flowy.serialization import traverse_dumps
from flowy.serialization import dumps
from flowy.serialization import loads
from flowy.serialization import traverse_data
from flowy.utils import logger


Expand Down Expand Up @@ -93,14 +94,15 @@ def __call__(self, *args, **kwargs):
order = task_exec_history.order(call_number, retry_number)
r = error(err, order)
break
errors, placeholders = scan_args(args, kwargs)
if errors:
r = copy_result_proxy(first(errors))
traversed_args, err, placeholders = traverse_data([args, kwargs])
if err:
r = copy_result_proxy(err)
break
if placeholders:
break # result = Placeholder
t_args, t_kwargs = traversed_args
try:
input_data = self.serialize_input(*args, **kwargs)
input_data = self.serialize_input(*t_args, **t_kwargs)
except Exception as e:
logger.exception('Error while serializing the task input:')
self.task_decision.fail(e)
Expand All @@ -115,7 +117,7 @@ def __call__(self, *args, **kwargs):

@staticmethod
def serialize_input(*args, **kwargs):
return traverse_dumps([args, kwargs])
return dumps([args, kwargs])

@staticmethod
def deserialize_result(result):
Expand Down
101 changes: 57 additions & 44 deletions flowy/serialization.py
Expand Up @@ -6,6 +6,12 @@
task results. Any serializer is supposed to do that.
"""

import sys
if sys.version_info < (3,):
uni = unicode
else:
uni = str

import collections
import json
import uuid
Expand All @@ -16,80 +22,72 @@
from flowy.operations import first


__all__ = ['traverse_dumps', 'dumps', 'loads']
__all__ = ['traverse_data', 'dumps', 'loads']


def traverse_dumps(value):
"""Serialize the data structure and checks for errors or placeholders.
def traverse_data(value):
"""Traveres the data structure and collect errors or placeholders.
Returns a 3-tuple: serialized data, oldest error, placehoders flag
Returns a 3-tuple: traversed data, oldest error, placehoders flag
The traversed suffers some changes:
* any mappable becomes a dict
* any iterator becomes a tuple
Serializing values should work as expected
>>> traverse_dumps(1)
('1', None, False)
>>> traverse_dumps(u'abc')
('"abc"', None, False)
>>> traverse_dumps([1, 2, 3, [4]])
('[1, 2, 3, [4]]', None, False)
>>> traverse_data(1)
(1, None, False)
>>> traverse_data(u'abc') == (u'abc', None, False)
True
>>> traverse_data([1, 2, 3, (4,)])
((1, 2, 3, (4,)), None, False)
>>> from flowy.result import error, placeholder, result
>>> r0 = result(u'r0', 0)
>>> e1 = error('err1', 1)
>>> e2 = error('err2', 2)
>>> e3 = error('err3', 3)
>>> r4 = result(u'r4', 4)
>>> r4 = result(4, 4)
>>> ph = placeholder()
Results work just like values
>>> traverse_dumps([r0, r4])
('["r0", "r4"]', None, False)
>>> traverse_dumps({r0: r4})
('{"r0": "r4"}', None, False)
>>> traverse_dumps((1, 2, 'a', r0))
('[1, 2, "a", "r0"]', None, False)
>>> traverse_data([r0, r4]) == ((u'r0', 4), None, False)
True
>>> traverse_data({r0: r4}) == ({u'r0': 4}, None, False)
True
>>> traverse_data((1, 2, 'a', r0)) == ((1, 2, 'a', u'r0'), None, False)
True
Any placeholder should be detected
>>> traverse_dumps(ph)
>>> traverse_data(ph)
(None, None, True)
>>> traverse_dumps([r0, [r4, [ph]]])
>>> traverse_data([r0, [r4, [ph]]])
(None, None, True)
>>> traverse_dumps({'a': {r0: {'b': ph}}})
>>> traverse_data({'a': {r0: {'b': ph}}})
(None, None, True)
>>> traverse_dumps([[[ph], ph]])
>>> traverse_data([[[ph], ph]])
(None, None, True)
(Oldest) Error has priority over placeholders flag
>>> r = traverse_dumps(e1)
>>> r = traverse_data(e1)
>>> r[0], r[1] is e1, r[2]
(None, True, False)
>>> r = traverse_dumps([e3, e1, e2])
>>> r = traverse_data([e3, e1, e2])
>>> r[0], r[1] is e1, r[2]
(None, True, False)
>>> r = traverse_dumps([e3, [e1, [e2]]])
>>> r = traverse_data([e3, [e1, [e2]]])
>>> r[0], r[1] is e1, r[2]
(None, True, False)
>>> r = traverse_dumps([r0, e2, r4])
>>> r = traverse_data([r0, e2, r4])
>>> r[0], r[1] is e2, r[2]
(None, True, False)
>>> r = traverse_dumps({r0: {'xyz': e3, (1, 2, r4): {'abc': e1}}})
>>> r = traverse_data({r0: {'xyz': e3, (1, 2, r4): {'abc': e1}}})
>>> r[0], r[1] is e1, r[2]
(None, True, False)
"""
traversed_value, error, has_placehoders = _traverse(value)
if error is not None: # don't deref. the error if it exists
return None, error, False
if has_placehoders:
return None, None, True
else:
return dumps(traversed_value), None, False


def _traverse(value):
try:
wait(value)
except TaskError:
Expand All @@ -100,33 +98,41 @@ def _traverse(value):
error = None
if is_result_proxy(value):
value = value.__wrapped__
if isinstance(value, (bytes, unicode)):
if isinstance(value, (bytes, uni)):
return value, None, False
if isinstance(value, collections.Mapping):
d = {}
for k, v in value.items():
k_, e, p1 = _traverse(k)
k_, e, p1 = traverse_data(k)
if e is not None:
error = first(error, e) if error is not None else e
v_, e, p2 = _traverse(v)
v_, e, p2 = traverse_data(v)
if e is not None:
error = first(error, e) if error is not None else e
placeholders = placeholders or p1 or p2
d[k_] = v_
return d, error, placeholders
return (d if error is None and not placeholders else None,
error, placeholders)
if isinstance(value, collections.Iterable):
l = []
for x in value:
x_, e, p = _traverse(x)
x_, e, p = traverse_data(x)
if e is not None:
error = first(error, e) if error is not None else e
placeholders = placeholders or p
l.append(x_)
return tuple(l), error, placeholders
return (tuple(l) if error is None and not placeholders else None,
error, placeholders)
return value, error, placeholders


def dumps(value):
"""Dump a JSON representation of the value, and some extra convenience.
>>> dumps([1, (2,), {u'x': b'abc'}, u'def'])
'[1, [2], {"x": {" b": "YWJj"}}, "def"]'
"""
return json.dumps(_tag(value))


Expand All @@ -137,14 +143,21 @@ def _tag(value):
return {' b': b64encode(value).decode('ascii')}
elif callable(getattr(value, '__json__', None)):
return _tag(value.__json__())
elif isinstance(value, list):
elif isinstance(value, (list, tuple)):
return [_tag(x) for x in value]
elif isinstance(value, dict):
return dict((k, _tag(v)) for k, v in value.items())
return value


def loads(value):
"""Load a JSON value created with dumps:
>>> (loads(dumps([1, (2,), {u'x': b'abc'}, u'def'])) ==
... [1, [2], {u'x': b'abc'}, u'def'])
True
"""
return json.loads(value, object_hook=_obj_hook)


Expand Down
2 changes: 1 addition & 1 deletion flowy/tests/swf_cases.py
Expand Up @@ -583,7 +583,7 @@
'name': 'ArgsStructErrors',
'version': 1,
'errors': {'task-0-0': 'Err1!', },
'expected': {'fail': 'Err!', },
'expected': {'fail': 'Err1!', },
'running': ['task-1-0', ],
}, {
'name': 'ArgsStructErrors',
Expand Down
26 changes: 0 additions & 26 deletions flowy/tests/test_swf.py
Expand Up @@ -291,29 +291,3 @@ def test_first_results(self):
e = error('err!', 3)
p = placeholder()
self.assertEquals(first([e, p, r, t]).__factory__, r.__factory__)


class TestResultJsonTransport(unittest.TestCase):
def _get_uut(self):
from flowy.result import result
return result, serialize_result, deserialize_result

def test_int(self):
r, s, ds = self._get_uut()
self.assertEquals(ds(s(r(10, 0))), 10)

def test_str(self):
r, s, ds = self._get_uut()
self.assertEquals(ds(s(r('abc', 0))), 'abc')

def test_list(self):
r, s, ds = self._get_uut()
self.assertEquals(ds(s(r(['abc', 2, 3], 0))), ['abc', 2, 3])

def test_dict(self):
r, s, ds = self._get_uut()
self.assertEquals(ds(s(r({'a': 1, 'b': 2}, 0))), {'a': 1, 'b': 2})

def test_combined(self):
r, s, ds = self._get_uut()
self.assertEquals(ds(s(r([r(1, 0)], 0))), [1])
6 changes: 3 additions & 3 deletions flowy/worker.py
Expand Up @@ -51,12 +51,12 @@ def __call__(self, key, input_data, decision, *extra_args):
return # Let it timeout
try:
serialized_result = wrapped_func(input_data, *extra_args)
except SuspendTask:
except SuspendTask: # only from workflows
decision.flush()
except TaskError as e:
except TaskError as e: # only from workflows
logger.exception('Unhandled task error in task:')
decision.fail(e)
except Restart as e:
except Restart as e: # only from workflows
decision.restart(e.input_data)
except Exception as e:
logger.exception('Unhandled exception in task:')
Expand Down
2 changes: 1 addition & 1 deletion tox.ini
Expand Up @@ -3,7 +3,7 @@ envlist = py27,py33,py34,pypy,cover

[testenv]
commands =
python setup.py -q nosetests
python setup.py -q nosetests --with-doctest

[testenv:py27]
setenv =
Expand Down

0 comments on commit e67ebc0

Please sign in to comment.