From 8d9dda789c1f65d8da902fefcab8044a5d9a53c6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sever=20B=C4=83ne=C5=9Fiu?= Date: Fri, 3 Jul 2015 17:53:26 +0300 Subject: [PATCH] Traverse. --- flowy/config.py | 12 +++-- flowy/proxy.py | 4 +- flowy/serialization.py | 116 +++++++++++++++++++++++++++-------------- 3 files changed, 87 insertions(+), 45 deletions(-) diff --git a/flowy/config.py b/flowy/config.py index f8664ee..3f93d4d 100644 --- a/flowy/config.py +++ b/flowy/config.py @@ -8,6 +8,7 @@ from flowy.result import restart_type from flowy.result import TaskError from flowy.serialization import dumps +from flowy.serialization import traverse_dumps from flowy.serialization import loads from flowy.utils import logger @@ -59,7 +60,7 @@ def deserialize_input(input_data): @staticmethod def serialize_result(result): - """Serialize and as a side effect, raise any SuspendTask/TaskErrors.""" + """Serialize the result.""" return dumps(result) def __call__(self, key=None): @@ -144,8 +145,13 @@ def __init__(self, deserialize_input=None, serialize_result=None, self.proxy_factory_registry = {} def serialize_restart_input(self, *args, **kwargs): - """Serialize and as a side effect, raise any SuspendTask/TaskErrors.""" - return dumps([args, kwargs]) + """Try to serialize the result, returns any errors or placeholders.""" + return traverse_dumps([args, kwargs]) + + @staticmethod + def serialize_result(result): + """Try to serialize the result, returns any errors or placeholders.""" + return traverse_dumps(result) def _check_dep(self, dep_name): """Check if dep_name is a unique valid identifier name.""" diff --git a/flowy/proxy.py b/flowy/proxy.py index 7ca2e98..b7590e1 100644 --- a/flowy/proxy.py +++ b/flowy/proxy.py @@ -8,7 +8,7 @@ from flowy.result import SuspendTask from flowy.result import timeout from flowy.result import wait -from flowy.serialization import dumps +from flowy.serialization import traverse_dumps from flowy.serialization import loads from flowy.utils import logger @@ -115,7 +115,7 @@ def __call__(self, *args, **kwargs): @staticmethod def serialize_input(*args, **kwargs): - return dumps([args, kwargs]) + return traverse_dumps([args, kwargs]) @staticmethod def deserialize_result(result): diff --git a/flowy/serialization.py b/flowy/serialization.py index 8b6daa6..1efa2ba 100644 --- a/flowy/serialization.py +++ b/flowy/serialization.py @@ -6,28 +6,30 @@ task results. Any serializer is supposed to do that. """ +import collections import json import uuid from base64 import b64decode from base64 import b64encode -from flowy.result import is_result_proxy +from flowy.result import is_result_proxy, TaskError, SuspendTask, wait +from flowy.operations import first -__all__ = ['loads', 'dumps'] +__all__ = ['traverse_dumps', 'dumps', 'loads'] -def dumps(value): +def traverse_dumps(value): """Serialize the data structure and checks for errors or placeholders. Returns a 3-tuple: serialized data, oldest error, placehoders flag Serializing values should work as expected - >>> dumps(1) + >>> traverse_dumps(1) ('1', None, False) - >>> dumps(u'abc') + >>> traverse_dumps(u'abc') ('"abc"', None, False) - >>> dumps([1, 2, 3, [4]]) + >>> traverse_dumps([1, 2, 3, [4]]) ('[1, 2, 3, [4]]', None, False) >>> from flowy.result import error, placeholder, result @@ -39,72 +41,106 @@ def dumps(value): >>> ph = placeholder() Results work just like values - >>> dumps([r0, r4]) + >>> traverse_dumps([r0, r4]) ('["r0", "r4"]', None, False) - >>> dumps({r0: r4}) + >>> traverse_dumps({r0: r4}) ('{"r0": "r4"}', None, False) + >>> traverse_dumps((1, 2, 'a', r0)) + ('[1, 2, "a", "r0"]', None, False) Any placeholder should be detected - >>> dumps(ph) - None, None, True - >>> dumps([r0, [r4, [ph]]]) - None, None, True - >>> dumps({'a': {r0: {ph: 'b'}}}) - None, None, True - >>> dumps([[[ph], ph]]) - None, None, True + >>> traverse_dumps(ph) + (None, None, True) + >>> traverse_dumps([r0, [r4, [ph]]]) + (None, None, True) + >>> traverse_dumps({'a': {r0: {'b': ph}}}) + (None, None, True) + >>> traverse_dumps([[[ph], ph]]) + (None, None, True) (Oldest) Error has priority over placeholders flag - >>> r = dumps(e1) + >>> r = traverse_dumps(e1) >>> r[0], r[1] is e1, r[2] - None, True, False + (None, True, False) - >>> r = dumps([e3, e1, e2]) + >>> r = traverse_dumps([e3, e1, e2]) >>> r[0], r[1] is e1, r[2] - None, True, False + (None, True, False) - >>> r = dumps([e3, [e1, [e2]]]) + >>> r = traverse_dumps([e3, [e1, [e2]]]) >>> r[0], r[1] is e1, r[2] - None, True, False + (None, True, False) - >>> r = dumps([r0, e2, r4) + >>> r = traverse_dumps([r0, e2, r4]) >>> r[0], r[1] is e2, r[2] - None, True, False + (None, True, False) - >>> r = dumps({r0: {'xyz': {e3: {e2: e1}}}}) + >>> r = traverse_dumps({r0: {'xyz': e3, (1, 2, r4): {'abc': e1}}}) >>> r[0], r[1] is e1, r[2] - None, True, False + (None, True, False) """ traversed_value, error, has_placehoders = _traverse(value) - if error: + if error is not None: # don't deref. the error if it exists return None, error, False if has_placehoders: return None, None, True else: - return json.dumps(traversed_value), None, False + return dumps(traversed_value), None, False + def _traverse(value): + try: + wait(value) + except TaskError: + return None, value, False + except SuspendTask: + return None, None, True + placeholders = False + error = None if is_result_proxy(value): value = value.__wrapped__ - if isinstance(value, tuple): - return [_traverse(x) for x in value] - elif isinstance(value, uuid.UUID): + if isinstance(value, (bytes, unicode)): + return value, None, False + if isinstance(value, collections.Mapping): + d = {} + for k, v in value.items(): + k_, e, p1 = _traverse(k) + if e is not None: + error = first(error, e) if error is not None else e + v_, e, p2 = _traverse(v) + if e is not None: + error = first(error, e) if error is not None else e + placeholders = placeholders or p1 or p2 + d[k_] = v_ + return d, error, placeholders + if isinstance(value, collections.Iterable): + l = [] + for x in value: + x_, e, p = _traverse(x) + if e is not None: + error = first(error, e) if error is not None else e + placeholders = placeholders or p + l.append(x_) + return tuple(l), error, placeholders + return value, error, placeholders + + +def dumps(value): + return json.dumps(_tag(value)) + + +def _tag(value): + if isinstance(value, uuid.UUID): return {' u': value.hex} elif isinstance(value, bytes): return {' b': b64encode(value).decode('ascii')} elif callable(getattr(value, '__json__', None)): - return _traverse(value.__json__()) + return _tag(value.__json__()) elif isinstance(value, list): - return [_traverse(x) for x in value] + return [_tag(x) for x in value] elif isinstance(value, dict): - d = {} - for k, v in value.items(): - if is_result_proxy(k): - k = k.__wrapped__ - v = _traverse(v) - d[k] = v - return d + return dict((k, _tag(v)) for k, v in value.items()) return value