Permalink
Browse files

add specs.Transform.

  • Loading branch information...
1 parent f6b71f4 commit 7ac0a05b9ddb0605a207c994cd640004dc88ec69 @knipknap knipknap committed Apr 10, 2012
Showing with 114 additions and 3 deletions.
  1. +4 −3 README
  2. +64 −0 SpiffWorkflow/specs/Transform.py
  3. +1 −0 SpiffWorkflow/specs/__init__.py
  4. +45 −0 tests/SpiffWorkflow/specs/TransformTest.py
View
7 README
@@ -1,5 +1,5 @@
Spiff Workflow
----------------
+==============
Spiff Workflow is a library implementing a framework for workflows.
It is based on http://www.workflowpatterns.com and implemented in pure Python.
@@ -62,9 +62,10 @@ Hint: The examples are located in tests/data/spiff/.
10. Block Task to Sub-Workflow Decomposition [data/block_to_subworkflow.xml]
11. Sub-Workflow Decomposition to Block Task [data/subworkflow_to_block.xml]
- Other Patterns:
+ Specs that have no corresponding workflow pattern on workflowpatterns.com:
- 1. Execute - spawns a subprocess and waits for the results
+ Execute - spawns a subprocess and waits for the results
+ Transform - executes commands that can be used for data transforms
Contact
View
64 SpiffWorkflow/specs/Transform.py
@@ -0,0 +1,64 @@
+# Copyright (C) 2007 Samuel Abels
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+from SpiffWorkflow.Task import Task
+from SpiffWorkflow.exceptions import WorkflowException
+from SpiffWorkflow.specs.TaskSpec import TaskSpec
+
+class Transform(TaskSpec):
+ """
+ This class implements a task that transforms input/output data.
+ """
+
+ def __init__(self, parent, name, transforms=None, **kwargs):
+ """
+ Constructor.
+
+ @type parent: TaskSpec
+ @param parent: A reference to the parent task spec.
+ @type name: str
+ @param name: The name of the task spec.
+ @type transforms: list
+ @param transforms: The commands that this task will execute to
+ transform data. The commands will be executed using the
+ python 'exec' function. Accessing inputs and outputs is
+ achieved by referencing the my_task.* and self.*
+ variables'
+ @type kwargs: dict
+ @param kwargs: See L{SpiffWorkflow.specs.TaskSpec}.
+ """
+ assert parent is not None
+ assert name is not None
+ TaskSpec.__init__(self, parent, name, **kwargs)
+ self.transforms = transforms
+
+
+ def _on_complete_hook(self, my_task):
+ if self.transforms:
+ for transform in self.transforms:
+ exec(transform)
+ return TaskSpec._on_complete_hook(self, my_task)
+
+ def serialize(self, serializer):
+ s_state = serializer._serialize_simple(self)
+ s_state['transforms'] = self.transforms
+ return s_state
+
+ @classmethod
+ def deserialize(self, serializer, wf_spec, s_state):
+ spec = Transform(wf_spec, m_state['name'])
+ spec.transforms = s_state['transforms']
+ serializer._deserialize_simple(spec, s_state)
+ return spec
View
1 SpiffWorkflow/specs/__init__.py
@@ -14,6 +14,7 @@
from SubWorkflow import SubWorkflow
from ThreadMerge import ThreadMerge
from ThreadSplit import ThreadSplit
+from Transform import Transform
from Trigger import Trigger
from WorkflowSpec import WorkflowSpec
View
45 tests/SpiffWorkflow/specs/TransformTest.py
@@ -0,0 +1,45 @@
+import sys, unittest, re, os
+sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', '..', '..'))
+
+from tests.SpiffWorkflow.util import run_workflow
+from TaskSpecTest import TaskSpecTest
+from SpiffWorkflow import Task
+from SpiffWorkflow.specs import Transform, Simple
+
+class TransformTest(TaskSpecTest):
+ CORRELATE = Transform
+
+ def create_instance(self):
+ return Transform(self.wf_spec,
+ 'testtask',
+ description = 'foo',
+ transforms = [''])
+
+ def testPattern(self):
+ """
+ Tests that we can create a task that executes an shell command
+ and that the workflow can be called to complete such tasks.
+ """
+ task1 = Transform(self.wf_spec, 'First', transforms=[
+ "my_task.set_attribute(foo=1)"])
+ self.wf_spec.start.connect(task1)
+ task2 = Transform(self.wf_spec, 'Second', transforms=[
+ "my_task.set_attribute(foo=my_task.attributes['foo']+1)",
+ "my_task.set_attribute(copy=my_task.attributes['foo'])"
+ ])
+ task1.connect(task2)
+ task3 = Simple(self.wf_spec, 'Last')
+ task2.connect(task3)
+
+ expected = 'Start\n First\n Second\n Last\n'
+ workflow = run_workflow(self, self.wf_spec, expected, '')
+ first = workflow.get_task(3)
+ last = workflow.get_task(5)
+ self.assertEqual(first.attributes.get('foo'), 1)
+ self.assertEqual(last.attributes.get('foo'), 2)
+ self.assertEqual(last.attributes.get('copy'), 2)
+
+def suite():
+ return unittest.TestLoader().loadTestsFromTestCase(TransformTest)
+if __name__ == '__main__':
+ unittest.TextTestRunner(verbosity = 2).run(suite())

0 comments on commit 7ac0a05

Please sign in to comment.