Skip to content

Commit

Permalink
Added first_* tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
severb committed Nov 13, 2014
1 parent bec995e commit 7748f88
Showing 1 changed file with 74 additions and 1 deletion.
75 changes: 74 additions & 1 deletion flowy/tests/test_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,8 @@ def set_state(self, running=[], timedout=[], results={}, errors={},
order=None):
self.scheduler = DummyScheduler()
self.Workflow = self.make_workflow()
order = list(range(10000))
if order is None:
order = list(range(100000))
self.workflow = self.Workflow(self.scheduler, '[[], {}]', 'token',
running, timedout, results, errors,
order, None, None)
Expand Down Expand Up @@ -469,3 +470,75 @@ def test_a_doesnt_block_when_finished(self):
('ACTIVITY', self.Workflow.b._spec, 4, '[["b_input"], {}]'),
'FLUSH'
)


class TestFristOfMany(TestWorkflowBase):

def make_workflow(self):
from flowy.task import _SWFWorkflow
from flowy.proxy import SWFActivityProxy

class MyWorkflow(_SWFWorkflow):

x = SWFActivityProxy(name='a', version=1, retry=0,
error_handling=True)

def run(self):
a = self.x()
b = self.x()
c = self.x()
return self.first_result(a, b, c)

return MyWorkflow

def test_first_result_returns_a(self):
self.set_state(results={0: '1', 1: '2', 2: '3'}, order=[0, 1, 2])
self.assert_scheduled(
('COMPLETE', '1')
)

def test_first_result_returns_b(self):
self.set_state(errors={0: "err"}, results={1: '2'}, timedout=[2],
order=[1, 0, 2])
self.assert_scheduled(
('COMPLETE', '2')
)

def test_first_error(self):
self.set_state(errors={2: '3'}, running=[0, 1], order=[2])
self.assert_scheduled(
('FAIL', '3'),
)

def test_first_to(self):
self.set_state(timedout=[2], running=[0, 1], order=[2])
msg = ("Activity SWFActivitySpec(name='a', version=1, task_list=None,"
" heartbeat=None, schedule_to_close=None,"
" schedule_to_start=None, start_to_close=None) has timed-out")
self.assert_scheduled(
('FAIL', msg),
)

class TestFristNOfMany(TestWorkflowBase):
def make_workflow(self):
from flowy.task import _SWFWorkflow
from flowy.proxy import SWFActivityProxy

class MyWorkflow(_SWFWorkflow):

x = SWFActivityProxy(name='a', version=1, retry=0,
error_handling=True)

def run(self):
a = self.x()
b = self.x()
c = self.x()
return self.first_results(2, a, b, c)

return MyWorkflow

def test_first_result_returns_a(self):
self.set_state(results={0: '1', 1: '2', 2: '3'}, order=[2, 1, 0])
self.assert_scheduled(
('COMPLETE', '[3, 2]')
)

0 comments on commit 7748f88

Please sign in to comment.