diff --git a/flowy/tests/test_swf.py b/flowy/tests/test_swf.py index 82469f8..141f224 100644 --- a/flowy/tests/test_swf.py +++ b/flowy/tests/test_swf.py @@ -3,11 +3,20 @@ import unittest from flowy.swf.history import SWFExecutionHistory +from flowy.proxy import Proxy +from flowy.config import ActivityConfig from flowy.tests.swf_cases import worker from flowy.tests.swf_cases import cases +serialize_result = ActivityConfig.serialize_result +deserialize_result = Proxy.deserialize_result +serialize_input = Proxy.serialize_input +deserialize_input = ActivityConfig.deserialize_input + + + class DummyDecision(object): def __init__(self): self.result = None @@ -26,13 +35,13 @@ def flush(self): def restart(self, input_data): if self.result is not None: return - args, kwargs = json.loads(input_data) + args, kwargs = deserialize_input(input_data) self.result = {'restart': {'input_args': args, 'input_kwargs': kwargs}} def finish(self, result): if self.result is not None: return - self.result = {'finish': json.loads(result)} + self.result = {'finish': deserialize_result(result)} default_activity = { 'input_args': [], @@ -47,7 +56,7 @@ def finish(self, result): def schedule_activity(self, call_key, name, version, input_data, task_list, heartbeat, schedule_to_close, schedule_to_start, start_to_close): - args, kwargs = json.loads(input_data) + args, kwargs = deserialize_input(input_data) self.queued['schedule'].append({ 'type': 'activity', 'call_key': call_key, @@ -73,7 +82,7 @@ def schedule_activity(self, call_key, name, version, input_data, task_list, def schedule_workflow(self, call_key, name, version, input_data, task_list, workflow_duration, decision_duration, child_policy): - args, kwargs = json.loads(input_data) + args, kwargs = deserialize_input(input_data) self.queued['schedule'].append({ 'type': 'workflow', 'call_key': call_key, @@ -153,10 +162,10 @@ def t(self): name, version = str(case['name']), str(case['version']) input_args = case.get('input_args', []) input_kwargs = case.get('input_kwargs', {}) - input_data = json.dumps([input_args, input_kwargs]) + input_data = serialize_input(*input_args, **input_kwargs) decision = DummyDecision() results = case.get('results', {}) - results = dict((k, json.dumps(v)) for k, v in results.items()) + results = dict((k, serialize_result(v)) for k, v in results.items()) order = (list(case.get('results', {}).keys()) + list(case.get( 'errors', {}).keys()) + list(case.get('timedout', []))) execution_history = SWFExecutionHistory( @@ -287,9 +296,7 @@ def test_first_results(self): class TestResultJsonTransport(unittest.TestCase): def _get_uut(self): from flowy.result import result - from flowy.config import ActivityConfig - from flowy.proxy import Proxy - return result, ActivityConfig.serialize_result, Proxy.deserialize_result + return result, serialize_result, deserialize_result def test_int(self): r, s, ds = self._get_uut()