Skip to content

Commit

Permalink
Traverse.
Browse files Browse the repository at this point in the history
  • Loading branch information
severb committed Jul 3, 2015
1 parent 0536062 commit 8d9dda7
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 45 deletions.
12 changes: 9 additions & 3 deletions flowy/config.py
Expand Up @@ -8,6 +8,7 @@
from flowy.result import restart_type
from flowy.result import TaskError
from flowy.serialization import dumps
from flowy.serialization import traverse_dumps
from flowy.serialization import loads
from flowy.utils import logger

Expand Down Expand Up @@ -59,7 +60,7 @@ def deserialize_input(input_data):

@staticmethod
def serialize_result(result):
"""Serialize and as a side effect, raise any SuspendTask/TaskErrors."""
"""Serialize the result."""
return dumps(result)

def __call__(self, key=None):
Expand Down Expand Up @@ -144,8 +145,13 @@ def __init__(self, deserialize_input=None, serialize_result=None,
self.proxy_factory_registry = {}

def serialize_restart_input(self, *args, **kwargs):
"""Serialize and as a side effect, raise any SuspendTask/TaskErrors."""
return dumps([args, kwargs])
"""Try to serialize the result, returns any errors or placeholders."""
return traverse_dumps([args, kwargs])

@staticmethod
def serialize_result(result):
"""Try to serialize the result, returns any errors or placeholders."""
return traverse_dumps(result)

def _check_dep(self, dep_name):
"""Check if dep_name is a unique valid identifier name."""
Expand Down
4 changes: 2 additions & 2 deletions flowy/proxy.py
Expand Up @@ -8,7 +8,7 @@
from flowy.result import SuspendTask
from flowy.result import timeout
from flowy.result import wait
from flowy.serialization import dumps
from flowy.serialization import traverse_dumps
from flowy.serialization import loads
from flowy.utils import logger

Expand Down Expand Up @@ -115,7 +115,7 @@ def __call__(self, *args, **kwargs):

@staticmethod
def serialize_input(*args, **kwargs):
return dumps([args, kwargs])
return traverse_dumps([args, kwargs])

@staticmethod
def deserialize_result(result):
Expand Down
116 changes: 76 additions & 40 deletions flowy/serialization.py
Expand Up @@ -6,28 +6,30 @@
task results. Any serializer is supposed to do that.
"""

import collections
import json
import uuid
from base64 import b64decode
from base64 import b64encode

from flowy.result import is_result_proxy
from flowy.result import is_result_proxy, TaskError, SuspendTask, wait
from flowy.operations import first


__all__ = ['loads', 'dumps']
__all__ = ['traverse_dumps', 'dumps', 'loads']


def dumps(value):
def traverse_dumps(value):
"""Serialize the data structure and checks for errors or placeholders.
Returns a 3-tuple: serialized data, oldest error, placehoders flag
Serializing values should work as expected
>>> dumps(1)
>>> traverse_dumps(1)
('1', None, False)
>>> dumps(u'abc')
>>> traverse_dumps(u'abc')
('"abc"', None, False)
>>> dumps([1, 2, 3, [4]])
>>> traverse_dumps([1, 2, 3, [4]])
('[1, 2, 3, [4]]', None, False)
>>> from flowy.result import error, placeholder, result
Expand All @@ -39,72 +41,106 @@ def dumps(value):
>>> ph = placeholder()
Results work just like values
>>> dumps([r0, r4])
>>> traverse_dumps([r0, r4])
('["r0", "r4"]', None, False)
>>> dumps({r0: r4})
>>> traverse_dumps({r0: r4})
('{"r0": "r4"}', None, False)
>>> traverse_dumps((1, 2, 'a', r0))
('[1, 2, "a", "r0"]', None, False)
Any placeholder should be detected
>>> dumps(ph)
None, None, True
>>> dumps([r0, [r4, [ph]]])
None, None, True
>>> dumps({'a': {r0: {ph: 'b'}}})
None, None, True
>>> dumps([[[ph], ph]])
None, None, True
>>> traverse_dumps(ph)
(None, None, True)
>>> traverse_dumps([r0, [r4, [ph]]])
(None, None, True)
>>> traverse_dumps({'a': {r0: {'b': ph}}})
(None, None, True)
>>> traverse_dumps([[[ph], ph]])
(None, None, True)
(Oldest) Error has priority over placeholders flag
>>> r = dumps(e1)
>>> r = traverse_dumps(e1)
>>> r[0], r[1] is e1, r[2]
None, True, False
(None, True, False)
>>> r = dumps([e3, e1, e2])
>>> r = traverse_dumps([e3, e1, e2])
>>> r[0], r[1] is e1, r[2]
None, True, False
(None, True, False)
>>> r = dumps([e3, [e1, [e2]]])
>>> r = traverse_dumps([e3, [e1, [e2]]])
>>> r[0], r[1] is e1, r[2]
None, True, False
(None, True, False)
>>> r = dumps([r0, e2, r4)
>>> r = traverse_dumps([r0, e2, r4])
>>> r[0], r[1] is e2, r[2]
None, True, False
(None, True, False)
>>> r = dumps({r0: {'xyz': {e3: {e2: e1}}}})
>>> r = traverse_dumps({r0: {'xyz': e3, (1, 2, r4): {'abc': e1}}})
>>> r[0], r[1] is e1, r[2]
None, True, False
(None, True, False)
"""
traversed_value, error, has_placehoders = _traverse(value)
if error:
if error is not None: # don't deref. the error if it exists
return None, error, False
if has_placehoders:
return None, None, True
else:
return json.dumps(traversed_value), None, False
return dumps(traversed_value), None, False


def _traverse(value):
try:
wait(value)
except TaskError:
return None, value, False
except SuspendTask:
return None, None, True
placeholders = False
error = None
if is_result_proxy(value):
value = value.__wrapped__
if isinstance(value, tuple):
return [_traverse(x) for x in value]
elif isinstance(value, uuid.UUID):
if isinstance(value, (bytes, unicode)):
return value, None, False
if isinstance(value, collections.Mapping):
d = {}
for k, v in value.items():
k_, e, p1 = _traverse(k)
if e is not None:
error = first(error, e) if error is not None else e
v_, e, p2 = _traverse(v)
if e is not None:
error = first(error, e) if error is not None else e
placeholders = placeholders or p1 or p2
d[k_] = v_
return d, error, placeholders
if isinstance(value, collections.Iterable):
l = []
for x in value:
x_, e, p = _traverse(x)
if e is not None:
error = first(error, e) if error is not None else e
placeholders = placeholders or p
l.append(x_)
return tuple(l), error, placeholders
return value, error, placeholders


def dumps(value):
return json.dumps(_tag(value))


def _tag(value):
if isinstance(value, uuid.UUID):
return {' u': value.hex}
elif isinstance(value, bytes):
return {' b': b64encode(value).decode('ascii')}
elif callable(getattr(value, '__json__', None)):
return _traverse(value.__json__())
return _tag(value.__json__())
elif isinstance(value, list):
return [_traverse(x) for x in value]
return [_tag(x) for x in value]
elif isinstance(value, dict):
d = {}
for k, v in value.items():
if is_result_proxy(k):
k = k.__wrapped__
v = _traverse(v)
d[k] = v
return d
return dict((k, _tag(v)) for k, v in value.items())
return value


Expand Down

0 comments on commit 8d9dda7

Please sign in to comment.