Skip to content

Commit

Permalink
Add support for manual deciders.
Browse files Browse the repository at this point in the history
Some flows are more complex than others. For instance: some activities can
be scheduled based on a condition in the context. Being able to (easily)
write the decider has a lot of advantages.

For this: in your flow, you need to define a decider method that takes a
`schedule` param. To call an activity, all you have to do is to call
`schedule`, provide an immutable id, the activity and a list (if necessary)
of requirements. Example:

```python
def decider(schedule):
    activity_1 = schedule(
        'activity_1', test_activity_1)
    activity_2 = schedule(
        'activity_2', test_activity_2, requires=[activity_1])

    for i in range(3):
        activity_3 = schedule(
            'activity-3.{}'.format(i), test_activity_3, requires=[activity_2])

        if activity_3.ready and activity_2.result.get('4') == 4:
            last_activity = schedule(
                'last_activity.{}'.format(i), test_activity_last,
                input=dict(the='end'))
```

Remember: the decider is called anytime you have an event has occurred. So this
method should only be focused on scheduling (and nothing else).

@rantonmattei
  • Loading branch information
xethorn committed Jul 1, 2015
1 parent eb8fb56 commit 58d3663
Show file tree
Hide file tree
Showing 3 changed files with 245 additions and 27 deletions.
39 changes: 36 additions & 3 deletions garcon/activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,16 @@
DEFAULT_ACTIVITY_SCHEDULE_TO_START = 600 # 10 minutes


class ActivityInstanceNotReadyException(Exception):
"""Exception when an activity instance is not ready.
Activity instances that are considered not ready are instances that have
not completed.
"""

pass


class ActivityInstance:

def __init__(
Expand Down Expand Up @@ -423,9 +433,25 @@ def __init__(self, activity_id):
"""

self.activity_id = activity_id
self.result = None
self._result = None
self.states = []

@property
def result(self):
"""Get the result.
"""

if not self.ready:
raise ActivityInstanceNotReadyException()
return self._result

@property
def ready(self):
"""Check if an activity is ready.
"""

return self.get_last_state() == ACTIVITY_COMPLETED

def get_last_state(self):
"""Get the last state of the activity execution.
Expand Down Expand Up @@ -456,9 +482,16 @@ def set_result(self, result):
result (dict): Result of the activity.
"""

if self.result:
if self._result:
raise Exception('Result is ummutable – it should not be changed.')
self.result = result
self._result = result

def wait():
"""Wait until ready.
"""

if not self.ready():
raise ActivityInstanceNotReadyException()


def worker_runner(worker):
Expand Down
229 changes: 206 additions & 23 deletions garcon/decider.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from boto.swf.exceptions import SWFDomainAlreadyExistsError
from boto.swf.exceptions import SWFTypeAlreadyExistsError
import boto.swf.layer2 as swf
import functools
import json

from garcon import activity
Expand Down Expand Up @@ -129,6 +130,66 @@ def register(self):
swf_entity.__class__.__name__, swf_entity.name,
'already exists')

def create_decisions_from_flow(self, decisions, activity_states, context):
"""Create the decisions from the flow.
Simple flows don't need a custom decider, since all the requirements
can be provided at the activity level. Discovery of the next activity
to schedule is thus very straightforward.
Args:
decisions (Layer1Decisions): the layer decision for swf.
activity_states (dict): all the state activities.
context (dict): the context of the activities.
"""

try:
for current in activity.find_available_activities(
self.flow, activity_states, context):

schedule_activity_task(
decisions, current, version=self.version)
else:
activities = list(
activity.find_uncomplete_activities(
self.flow, activity_states, context))
if not activities:
decisions.complete_workflow_execution()
except Exception as e:
decisions.fail_workflow_execution(reason=str(e))

def delegate_decisions(self, decisions, decider, history, context):
"""Delegate the decisions.
For more complex flows (the ones that have, for instance, optional
activities), you can write your own decider. The decider receives a
method `schedule` which schedule the activity if not scheduled yet,
and if scheduled, returns its result.
Args:
decisions (Layer1Decisions): the layer decision for swf.
decider (callable): the decider (it needs to have schedule)
history (dict): all the state activities and its history.
context (dict): the context of the activities.
"""

schedule_context = ScheduleContext()
decider_schedule = functools.partial(
schedule, decisions, schedule_context, history, context,
version=self.version)

try:
decider(schedule=decider_schedule)

# When no exceptions are raised and the method decider has returned
# it means that there i nothing left to do in the current decider.
if schedule_context.completed:
decisions.complete_workflow_execution()
except activity.ActivityInstanceNotReadyException:
pass
except Exception as e:
decisions.fail_workflow_execution(reason=str(e))

def run(self):
"""Run the decider.
Expand All @@ -148,6 +209,7 @@ def run(self):
"""

poll = self.poll()
custom_decider = getattr(self.flow, 'decider', None)

if 'events' not in poll:
return True
Expand All @@ -161,29 +223,150 @@ def run(self):
context.update(workflow_execution_info)

decisions = swf.Layer1Decisions()
if not custom_decider:
self.create_decisions_from_flow(
decisions, activity_states, context)
else:
self.delegate_decisions(
decisions, custom_decider, activity_states, context)
self.complete(decisions=decisions)
return True

try:
for current in activity.find_available_activities(
self.flow, activity_states, context):

decisions.schedule_activity_task(
current.id, # activity id.
current.activity_name,
self.version,
task_list=current.activity_worker.task_list,
input=json.dumps(current.create_execution_input()),
heartbeat_timeout=str(current.heartbeat_timeout),
start_to_close_timeout=str(current.timeout),
schedule_to_start_timeout=str(current.schedule_to_start),
schedule_to_close_timeout=str(current.schedule_to_close))
else:
activities = list(
activity.find_uncomplete_activities(
self.flow, activity_states, context))
if not activities:
decisions.complete_workflow_execution()
except Exception as e:
decisions.fail_workflow_execution(reason=str(e))
class ScheduleContext:
"""
Schedule Context
================
self.complete(decisions=decisions)
return True
The schedule context keeps track of all the current scheduling progress – 
which allows to easy determinate if there are more decisions to be taken
or if the execution can be closed.
"""

def __init__(self):
"""Create a schedule context.
"""

self.completed = True

def mark_uncompleted(self):
"""Mark the scheduling as completed.
When a scheduling is completed, it means all the activities have been
properly scheduled and they have all completed.
"""

self.completed = False


def schedule_activity_task(
decisions, instance, version='1.0', id=None):
"""Schedule an activity task.
Args:
decisions (Layer1Decisions): the layer decision for swf.
instance (ActivityInstance): the activity instance to schedule.
version (str): the version of the activity instance.
id (str): optional id of the activity instance.
"""

decisions.schedule_activity_task(
id or instance.id,
instance.activity_name,
version,
task_list=instance.activity_worker.task_list,
input=json.dumps(instance.create_execution_input()),
heartbeat_timeout=str(instance.heartbeat_timeout),
start_to_close_timeout=str(instance.timeout),
schedule_to_start_timeout=str(instance.schedule_to_start),
schedule_to_close_timeout=str(instance.schedule_to_close))


def schedule(
decisions, schedule_context, history, context, schedule_id,
current_activity, requires=None, input=None, version='1.0'):
"""Schedule an activity.
Scheduling an activity requires all the requirements to be completed (all
activities should be marked as completed). The scheduler also mixes the
input with the full execution context to send the data to the activity.
Args:
decisions (Layer1Decisions): the layer decision for swf.
schedule_context (dict): information about the schedule.
history (dict): history of the execution.
context (dict): context of the execution.
schedule_id (str): the id of the activity to schedule.
current_activity (Activity): the activity to run.
requires (list): list of all requirements.
input (dict): additional input for the context.
Throws:
ActivityInstanceNotReadyException: if one of the activity in the
requirements is not ready.
Return:
State: the state of the schedule (contains the response).
"""

ensure_requirements(requires)
activity_completed = set()
result = dict()

instance_context = dict()
instance_context.update(context or {})
instance_context.update(input or {})

for current in current_activity.instances(instance_context):
current_id = '{}-{}'.format(current.id, schedule_id)
states = history.get(current.activity_name, {}).get(current_id)

if states:
if states.get_last_state() == activity.ACTIVITY_COMPLETED:
result.update(states.result or dict())
activity_completed.add(True)
continue

activity_completed.add(False)
schedule_context.mark_uncompleted()

if states.get_last_state() != activity.ACTIVITY_FAILED:
continue
elif (not instance.retry or
instance.retry < count_activity_failures(states)):
raise Exception(
'The activity failures has exceeded its retry limit.')

activity_completed.add(False)
schedule_context.mark_uncompleted()
schedule_activity_task(
decisions, current, id=current_id, version=version)

state = activity.ActivityState(current_activity.name)
state.add_state(activity.ACTIVITY_SCHEDULED)

if len(activity_completed) == 1 and True in activity_completed:
state.add_state(activity.ACTIVITY_COMPLETED)
state.set_result(result)
return state


def ensure_requirements(requires):
"""Ensure scheduling meets requirements.
Verify the state of the requirements to make sure the activity can be
scheduled.
Args:
requires (list): list of all requirements.
Throws:
ActivityInstanceNotReadyException: if one of the activity in the
requirements is not ready.
"""

requires = requires or []
for require in requires:
if (not require or
require.get_last_state() != activity.ACTIVITY_COMPLETED):
raise activity.ActivityInstanceNotReadyException()
4 changes: 3 additions & 1 deletion garcon/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,11 @@ def activity_states_from_events(events):
activity_event.get('activity_name'), {}).setdefault(
activity_id, activity.ActivityState(activity_id)).add_state(
activity.ACTIVITY_COMPLETED)

result = json.loads(activity_info.get('result') or '{}')
activity_events.get(
activity_event.get('activity_name')).get(
activity_id).set_result(activity_info.get('result'))
activity_id).set_result(result)

return activity_events

Expand Down

0 comments on commit 58d3663

Please sign in to comment.