Skip to content

Commit

Permalink
Add ActivityInstance and Generator for Activities
Browse files Browse the repository at this point in the history
Those two new concepts are introducing the ability to run an activity _n_ times
based on a value that is within a context. It allows more complex operations such
as: running one activity for a date within a date range. Retry mechanisms are
also a lot more granular.

The ActivityInstance represents an execution of the same activity with a different
context. Each execution of an activity will have at least one instance, and if you
use a generator, it will have n instances.

Adding generators does not change the dependency tree. If you have an activity
that depends on another one, the dependency will wait until all instances have
completed.

@rantonmattei
  • Loading branch information
xethorn committed Feb 11, 2015
1 parent 2a9d0c0 commit 838f6d6
Show file tree
Hide file tree
Showing 6 changed files with 386 additions and 62 deletions.
207 changes: 179 additions & 28 deletions garcon/activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,113 @@
Activities are self generated classes to which you can pass an identifier,
and a list of tasks to perform. The activities are in between the decider and
the task.
the tasks.
For ease, two types of task runners are available: Sync and Async. If
you need something more specific, you should either create your own runner, or
you should create a main task that will then split the work.
Create an activity::
from garcon import activity
# First step is to create the workflow on a specific domain.
create = activity.create('domain')
initial_activity = create(
# Name of your activity
name='activity_name',
# List of tasks (here we use the Sync runner)
tasks=runner.Sync(task1),
# No requires since it's the first one. Later in your flow, if you have
# a dependency, just use the variable that contains the activity.
requires=[],
# If the activity fails, number of times you want to retry.
retry=0,
# If you want to run the activity `n` times, you can use a generator.
generator=[generator_name])
"""

from threading import Thread
import boto.swf.layer2 as swf
import itertools
import json

from garcon import log
from garcon import utils


ACTIVITY_STANDBY = 0
ACTIVITY_SCHEDULED = 1
ACTIVITY_COMPLETED = 2
ACTIVITY_FAILED = 3


class ActivityInstance:

def __init__(self, activity_worker, context=None):
"""Activity Instance.
In SWF, Activity is a worker: it will get information from the context,
and will launch activity instances (only one, unless you have a
generator.) The activity instance generates its key (visible in the SWF
console) from the local context. Activity instances are owned by an
execution.
Args:
activity_worker (ActivityWorker): The activity worker that owns
this specific Activity Instance.
context (dict): the local context of the activity (it does not
include the execution context.) Most times the context will be
empty since it is only filled with data that comes from the
generators.
"""

self.activity_worker = activity_worker
self.context = context or dict()

@property
def activity_name(self):
"""Return the activity name of the worker.
"""

return self.activity_worker.name

@property
def retry(self):
"""Return the number of retries allowed (matches the worker.)
"""

return self.activity_worker.retry

@property
def id(self):
"""Generate the id of the activity.
The id is crutial (not just important): it allows to indentify the
state the activity instance in the event history (if it has failed,
been executed, or marked as completed.)
Return:
str: composed of the activity name (task list), and the activity
id.
"""

if not self.context:
activity_id = 1
else:
activity_id = utils.create_dictionary_key(self.context)

return '{name}-{id}'.format(
name=self.activity_name,
id=activity_id)


class Activity(swf.ActivityWorker, log.GarconLogger):
version = '1.0'
task_list = None
Expand Down Expand Up @@ -82,6 +170,44 @@ def hydrate(self, data):
self.retry = getattr(self, 'retry', None) or data.get('retry', 0)
self.task_list = self.task_list or data.get('task_list')
self.tasks = getattr(self, 'tasks', []) or data.get('tasks')
self.generators = getattr(
self, 'generators', None) or data.get('generators')

def instances(self, context):
"""Get all instances for one activity based on the current context.
There are two scenarios: when the activity worker has a generator and
when it does not. When it doesn't (the most simple case), there will
always be one instance returned.
Generators will however consume the context to calculate how many
instances of the activity are needed – and it will generate them
(regardless of their state.)
Args:
context (dict): the current context.
Return:
list: all the instances of the activity (for a current workflow
execution.)
"""

if not self.generators:
yield ActivityInstance(self)
return

generator_values = []
for generator in self.generators:
generator_values.append(generator(context))

for generator_contexts in itertools.product(*generator_values):
# Each generator returns a context, merge all the contexts
# to only be one - which can be used to 1/ create the id of the
# activity and 2/ be passed as a local context.
instance_context = dict()
for current_generator_context in generator_contexts:
instance_context.update(current_generator_context.items())

yield ActivityInstance(self, context=instance_context)

@property
def timeout(self):
Expand Down Expand Up @@ -117,7 +243,7 @@ def __init__(self, flow, activities=None):
"""

self.flow = flow
self.activities = find_activities(self.flow)
self.activities = find_workflow_activities(self.flow)
self.worker_activities = activities

def run(self):
Expand Down Expand Up @@ -159,6 +285,7 @@ def wrapper(**options):
activity.hydrate(dict(
domain=domain,
name=options.get('name'),
generators=options.get('generators', []),
requires=options.get('requires', []),
retry=options.get('retry'),
task_list=domain + '_' + options.get('name'),
Expand All @@ -168,75 +295,99 @@ def wrapper(**options):
return wrapper


def find_available_activities(flow, history):
"""Find all available activities of a flow.
def find_available_activities(flow, history, context):
"""Find all available activity instances of a flow.
The history contains all the information of our activities (their state).
This method focuses on finding all the activities that need to run.
Args:
flow (module): the flow module.
history (dict): the history information.
context (dict): from the context find the available activities.
"""

for activity in find_activities(flow):
for instance in find_activities(flow, context):
# If an event is already available for the activity, it means it is
# not in standby anymore, it's either processing or has been completed.
# The activity is thus not available anymore.
event = history.get(activity.name)
events = history.get(instance.activity_name, {}).get(instance.id)

if event:
if event[-1] != ACTIVITY_FAILED:
if events:
if events[-1] != ACTIVITY_FAILED:
continue
elif (not activity.retry or
activity.retry < count_activity_failures(event)):
elif (not instance.retry or
instance.retry < count_activity_failures(events)):
raise Exception(
'The activity failures has exceeded its retry limit.')

add = True
for requirement in activity.requires:
requirement_evt = history.get(requirement.name) or []
if not ACTIVITY_COMPLETED in requirement_evt:
add = False
break
for requirement in instance.activity_worker.requires:
require_history = history.get(requirement.name)

if add:
yield activity
if not require_history:
return

for requirement_evt in require_history.values():
if not ACTIVITY_COMPLETED in requirement_evt:
return

def find_uncomplete_activities(flow, history):
"""Find uncomplete activities.
yield instance


def find_uncomplete_activities(flow, history, context):
"""Find uncomplete activity instances.
Uncomplete activities are all the activities that are not marked as
completed.
Args:
flow (module): the flow module.
history (dict): the history information.
context (dict): from the context find the available activities.
Yield:
activity: The available activity.
"""

for activity in find_activities(flow):
evts = history.get(activity.name)
for instance in find_activities(flow, context):
evts = history.get(instance.activity_name, {}).get(instance.id)
if not evts or ACTIVITY_COMPLETED not in evts:
yield activity
yield instance


def find_workflow_activities(flow):
"""Retrieves all the activities from a flow
Args:
flow (module): the flow module.
Return:
list: all the activities.
"""

def find_activities(flow):
activities = []
for module_attribute in dir(flow):
current_activity = getattr(flow, module_attribute)
if isinstance(current_activity, Activity):
activities.append(current_activity)
return activities


def find_activities(flow, context):
"""Retrieves all the activities from a flow.
Args:
flow (module): the flow module.
Return:
List of all the activities for the flow.
list: All the activity instances for the flow.
"""

activities = []
for module_attribute in dir(flow):
instance = getattr(flow, module_attribute)
if isinstance(instance, Activity):
activities.append(instance)
current_activity = getattr(flow, module_attribute)

if isinstance(current_activity, Activity):
for activity_instance in current_activity.instances(context):
activities.append(activity_instance)

return activities


Expand Down
20 changes: 12 additions & 8 deletions garcon/decider.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def __init__(self, flow, register=True):
self.domain = flow.domain
self.task_list = flow.domain + '_decider'
self.version = '1.0'
self.activities = activity.find_activities(flow)
self.activities = activity.find_workflow_activities(flow)

super(DeciderWorker, self).__init__()

Expand Down Expand Up @@ -162,18 +162,22 @@ def run(self):

try:
for current in activity.find_available_activities(
self.flow, activity_states):
self.flow, activity_states, context):

local_activity_context = dict(
context.items() | current.context.items())

decisions.schedule_activity_task(
'%s-%i' % (current.name, time.time()),
current.name,
current.id, # activity id.
current.activity_name,
self.version,
task_list=current.task_list,
input=json.dumps(context),
start_to_close_timeout=current.timeout)
task_list=current.activity_worker.task_list,
input=json.dumps(local_activity_context),
start_to_close_timeout=current.activity_worker.timeout)
else:
activities = list(
activity.find_uncomplete_activities(
self.flow, activity_states))
self.flow, activity_states, context))
if not activities:
decisions.complete_workflow_execution()
except Exception as e:
Expand Down

0 comments on commit 838f6d6

Please sign in to comment.