Skip to content

Commit

Permalink
Use the same serialization/deserialization funcs in tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
severb committed Jun 29, 2015
1 parent e4137eb commit 6a58df5
Showing 1 changed file with 16 additions and 9 deletions.
25 changes: 16 additions & 9 deletions flowy/tests/test_swf.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,20 @@
import unittest

from flowy.swf.history import SWFExecutionHistory
from flowy.proxy import Proxy
from flowy.config import ActivityConfig

from flowy.tests.swf_cases import worker
from flowy.tests.swf_cases import cases


serialize_result = ActivityConfig.serialize_result
deserialize_result = Proxy.deserialize_result
serialize_input = Proxy.serialize_input
deserialize_input = ActivityConfig.deserialize_input



class DummyDecision(object):
def __init__(self):
self.result = None
Expand All @@ -26,13 +35,13 @@ def flush(self):
def restart(self, input_data):
if self.result is not None:
return
args, kwargs = json.loads(input_data)
args, kwargs = deserialize_input(input_data)
self.result = {'restart': {'input_args': args, 'input_kwargs': kwargs}}

def finish(self, result):
if self.result is not None:
return
self.result = {'finish': json.loads(result)}
self.result = {'finish': deserialize_result(result)}

default_activity = {
'input_args': [],
Expand All @@ -47,7 +56,7 @@ def finish(self, result):
def schedule_activity(self, call_key, name, version, input_data, task_list,
heartbeat, schedule_to_close, schedule_to_start,
start_to_close):
args, kwargs = json.loads(input_data)
args, kwargs = deserialize_input(input_data)
self.queued['schedule'].append({
'type': 'activity',
'call_key': call_key,
Expand All @@ -73,7 +82,7 @@ def schedule_activity(self, call_key, name, version, input_data, task_list,

def schedule_workflow(self, call_key, name, version, input_data, task_list,
workflow_duration, decision_duration, child_policy):
args, kwargs = json.loads(input_data)
args, kwargs = deserialize_input(input_data)
self.queued['schedule'].append({
'type': 'workflow',
'call_key': call_key,
Expand Down Expand Up @@ -153,10 +162,10 @@ def t(self):
name, version = str(case['name']), str(case['version'])
input_args = case.get('input_args', [])
input_kwargs = case.get('input_kwargs', {})
input_data = json.dumps([input_args, input_kwargs])
input_data = serialize_input(*input_args, **input_kwargs)
decision = DummyDecision()
results = case.get('results', {})
results = dict((k, json.dumps(v)) for k, v in results.items())
results = dict((k, serialize_result(v)) for k, v in results.items())
order = (list(case.get('results', {}).keys()) + list(case.get(
'errors', {}).keys()) + list(case.get('timedout', [])))
execution_history = SWFExecutionHistory(
Expand Down Expand Up @@ -287,9 +296,7 @@ def test_first_results(self):
class TestResultJsonTransport(unittest.TestCase):
def _get_uut(self):
from flowy.result import result
from flowy.config import ActivityConfig
from flowy.proxy import Proxy
return result, ActivityConfig.serialize_result, Proxy.deserialize_result
return result, serialize_result, deserialize_result

def test_int(self):
r, s, ds = self._get_uut()
Expand Down

0 comments on commit 6a58df5

Please sign in to comment.