Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse code

Add 'Execute' Task Spec

  • Loading branch information...
commit 1c96c12eac0f8fcdf9e25cebb042087209485e9e 1 parent 6cb935b
Ziad Sawalha authored
4 README
@@ -62,6 +62,10 @@ Hint: The examples are located in tests/data/spiff/.
62 62 10. Block Task to Sub-Workflow Decomposition [data/block_to_subworkflow.xml]
63 63 11. Sub-Workflow Decomposition to Block Task [data/subworkflow_to_block.xml]
64 64
  65 + Other Patterns:
  66 +
  67 + 1. Execute - spawns a subprocess and waits for the results
  68 +
65 69
66 70 Contact
67 71 --------
85 SpiffWorkflow/specs/Execute.py
... ... @@ -0,0 +1,85 @@
  1 +# Copyright (C) 2007 Samuel Abels
  2 +#
  3 +# This library is free software; you can redistribute it and/or
  4 +# modify it under the terms of the GNU Lesser General Public
  5 +# License as published by the Free Software Foundation; either
  6 +# version 2.1 of the License, or (at your option) any later version.
  7 +#
  8 +# This library is distributed in the hope that it will be useful,
  9 +# but WITHOUT ANY WARRANTY; without even the implied warranty of
  10 +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  11 +# Lesser General Public License for more details.
  12 +#
  13 +# You should have received a copy of the GNU Lesser General Public
  14 +# License along with this library; if not, write to the Free Software
  15 +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
  16 +import subprocess
  17 +
  18 +from SpiffWorkflow.Task import Task
  19 +from SpiffWorkflow.exceptions import WorkflowException
  20 +from SpiffWorkflow.specs.TaskSpec import TaskSpec
  21 +
  22 +
  23 +class Execute(TaskSpec):
  24 + """
  25 + This class executes an external process, goes into WAITING until the
  26 + process is complete, and returns the results of the execution.
  27 +
  28 + Usage:
  29 +
  30 + task = Execute(spec, 'Ping', args=["ping", "-t", "1", "127.0.0.1"])
  31 + ... when workflow complete
  32 + print workflow.get_task('Ping').results
  33 + """
  34 +
  35 + def __init__(self, parent, name, args=None, **kwargs):
  36 + """
  37 + Constructor.
  38 +
  39 + @type parent: TaskSpec
  40 + @param parent: A reference to the parent task spec.
  41 + @type name: str
  42 + @param name: The name of the task spec.
  43 + @type args: list
  44 + @param args: args to pass to process (first arg is the command).
  45 + @type kwargs: dict
  46 + @param kwargs: kwargs to pass-through to TaskSpec initializer.
  47 + """
  48 + assert parent is not None
  49 + assert name is not None
  50 + TaskSpec.__init__(self, parent, name, **kwargs)
  51 + self.args = args
  52 +
  53 + def try_fire(self, my_task, force = False):
  54 + """Returns False when successfully fired, True otherwise"""
  55 + if (not hasattr(my_task, 'subprocess')) or my_task.subprocess is None:
  56 + my_task.subprocess = subprocess.Popen(self.args,
  57 + stderr=subprocess.STDOUT,
  58 + stdout=subprocess.PIPE)
  59 +
  60 + if my_task.subprocess:
  61 + my_task.subprocess.poll()
  62 + if my_task.subprocess.returncode is None:
  63 + # Still waiting
  64 + return False
  65 + else:
  66 + results = my_task.subprocess.communicate()
  67 + my_task.results = results
  68 + return True
  69 + return False
  70 +
  71 + def _update_state_hook(self, my_task):
  72 + if not self.try_fire(my_task):
  73 + my_task.state = Task.WAITING
  74 + result = False
  75 + else:
  76 + result = super(Execute, self)._update_state_hook(my_task)
  77 + return result
  78 +
  79 + def serialize(self, serializer):
  80 + return serializer._serialize_execute(self)
  81 +
  82 + @classmethod
  83 + def deserialize(self, serializer, wf_spec, s_state):
  84 + spec = serializer._deserialize_execute(wf_spec, s_state)
  85 + return spec
1  SpiffWorkflow/specs/__init__.py
@@ -3,6 +3,7 @@
3 3 from CancelTask import CancelTask
4 4 from Choose import Choose
5 5 from ExclusiveChoice import ExclusiveChoice
  6 +from Execute import Execute
6 7 from Gate import Gate
7 8 from Join import Join
8 9 from MultiChoice import MultiChoice
10 SpiffWorkflow/storage/DictionarySerializer.py
@@ -183,6 +183,16 @@ def _deserialize_exclusive_choice(self, wf_spec, s_state):
183 183 spec.default_task_spec = s_state['default_task_spec']
184 184 return spec
185 185
  186 + def _serialize_execute(self, spec):
  187 + s_state = self._serialize_task_spec(spec)
  188 + s_state['args'] = spec.args
  189 + return s_state
  190 +
  191 + def _deserialize_execute(self, wf_spec, s_state):
  192 + spec = Execute(wf_spec, s_state['name'], s_state['args'])
  193 + self._deserialize_task_spec(wf_spec, s_state, spec = spec)
  194 + return spec
  195 +
186 196 def _serialize_gate(self, spec):
187 197 s_state = self._serialize_task_spec(spec)
188 198 s_state['context'] = spec.context
33 tests/SpiffWorkflow/TaskTest.py
... ... @@ -1,9 +1,11 @@
  1 +import time
1 2 import sys, unittest, re, os.path
2 3 sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', '..'))
3 4
4 5 from SpiffWorkflow import Task
5   -from SpiffWorkflow.Workflow import TaskIdAssigner
6   -from SpiffWorkflow.specs import WorkflowSpec, Simple
  6 +from SpiffWorkflow.Workflow import TaskIdAssigner, Workflow
  7 +from SpiffWorkflow.specs import WorkflowSpec, Simple, Execute
  8 +from SpiffWorkflow.specs.TaskSpec import TaskSpec
7 9 from SpiffWorkflow.exceptions import WorkflowException
8 10
9 11 class MockWorkflow(object):
@@ -69,6 +71,33 @@ def testTree(self):
69 71 'Expected:\n' + expected2 + '\n' + \
70 72 'but got:\n' + result)
71 73
  74 + def test_execute(self):
  75 + """Tests that we can create a task that executes an shell command
  76 + and that the workflow can be called to complete such tasks"""
  77 + spec = WorkflowSpec()
  78 + task1 = Execute(spec, 'Ping', args=["ping", "-t", "1", "127.0.0.1"])
  79 + spec.start.connect(task1)
  80 + workflow = Workflow(spec)
  81 +
  82 + i = 0
  83 + while not workflow.is_completed() and i < 10:
  84 + workflow.complete_all()
  85 + i += 1
  86 + time.sleep(0.5)
  87 + self.assertTrue(workflow.is_completed())
  88 + self.assertEqual(i, 3)
  89 + task = workflow.get_task(3)
  90 + self.assertEquals(task.state_history, [1, 8, 16, 64])
  91 + # Check whether the status log is accurate.
  92 + expected = """Moving 'Ping' from FUTURE to WAITING
  93 +Moving 'Ping' from WAITING to READY
  94 +Moving 'Ping' from READY to COMPLETED"""
  95 + self.assert_(expected == '\n'.join(task.log),
  96 + 'Expected:\n' + expected + '\n' + \
  97 + 'but got:\n' + '\n'.join(task.log))
  98 + self.assertIn('127.0.0.1', task.results[0])
  99 +
  100 +
72 101 def suite():
73 102 return unittest.TestLoader().loadTestsFromTestCase(TaskTest)
74 103 if __name__ == '__main__':

0 comments on commit 1c96c12

Please sign in to comment.
Something went wrong with that request. Please try again.