Skip to content

Commit

Permalink
Add the schedule_to_start and schedule_to_close timeouts.
Browse files Browse the repository at this point in the history
Those two timeouts are calculated based on the assumption that only one
worker per activity. So if the user has set the create of the activity
to be 10 minutes and a generator spawns 10 activities, the schedule to start
will be 100 minutes.

Schedule to close timeout is an addition of schedule to start and start to
close timeout.

@rantonmattei
  • Loading branch information
xethorn committed Apr 27, 2015
1 parent 63b70b9 commit 801e431
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 4 deletions.
50 changes: 47 additions & 3 deletions garcon/activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@
ACTIVITY_COMPLETED = 2
ACTIVITY_FAILED = 3

DEFAULT_ACTIVITY_SCHEDULE_TO_START = 600 # 10 minutes


class ActivityInstance:

Expand Down Expand Up @@ -196,12 +198,20 @@ def hydrate(self, data):
data (dict): the data to use (if defined.)
"""

self.pool_size = 0
self.name = self.name or data.get('name')
self.domain = getattr(self, 'domain', '') or data.get('domain')
self.requires = getattr(self, 'requires', []) or data.get('requires')
self.retry = getattr(self, 'retry', None) or data.get('retry', 0)
self.task_list = self.task_list or data.get('task_list')

# The start timeout is how long it will take between the scheduling
# of the activity and the start of the activity.
self.schedule_to_start_timeout = (
getattr(self, 'schedule_to_start_timeout', None) or
data.get('schedule_to_start') or
DEFAULT_ACTIVITY_SCHEDULE_TO_START)

# The previous way to create an activity was to fill a `tasks` param,
# which is not `run`.
self.runner = (
Expand Down Expand Up @@ -230,22 +240,56 @@ def instances(self, context):
"""

if not self.generators:
self.pool_size = 1
yield ActivityInstance(self)
return

generator_values = []
for generator in self.generators:
generator_values.append(generator(context))

for generator_contexts in itertools.product(*generator_values):
contexts = list(itertools.product(*generator_values))
self.pool_size = len(contexts)
for generator_contexts in contexts:
# Each generator returns a context, merge all the contexts
# to only be one - which can be used to 1/ create the id of the
# activity and 2/ be passed as a local context.
instance_context = dict()
for current_generator_context in generator_contexts:
instance_context.update(current_generator_context.items())

yield ActivityInstance(self, context=instance_context)
yield ActivityInstance(
self, context=instance_context)

@property
def schedule_to_start(self):
"""Return the schedule to start timeout.
The schedule to start timeout assumes that only one activity worker is
available (since swf does not provide a count of available workers). So
if the default value is 5 minutes, and you have 10 instances: the
schedule to start will be 50 minutes for all instances.
Return:
int: Schedule to start timeout.
"""

return self.pool_size * self.schedule_to_start_timeout

@property
def schedule_to_close(self):
"""Return the schedule to close timeout.
The schedule to close timeout is a simple calculation that defines when
an activity (from the moment it has been scheduled) should end. It is
a calculation between the schedule to start timeout and the activity
timeout.
Return:
int: Schedule to close timeout.
"""

return self.schedule_to_start + int(self.timeout)

@property
def timeout(self):
Expand Down Expand Up @@ -347,7 +391,7 @@ def wrapper(**options):
task_list=activity_name,
tasks=options.get('tasks'),
run=options.get('run'),
))
schedule_to_start=options.get('schedule_to_start')))
return activity
return wrapper

Expand Down
7 changes: 6 additions & 1 deletion garcon/decider.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,12 @@ def run(self):
task_list=current.activity_worker.task_list,
input=json.dumps(current.create_execution_input(context)),
heartbeat_timeout=current.activity_worker.heartbeat_timeout,
start_to_close_timeout=current.activity_worker.timeout)
start_to_close_timeout=str(
current.activity_worker.timeout),
schedule_to_start_timeout=str(
current.activity_worker.schedule_to_start),
schedule_to_close_timeout=str(
current.activity_worker.schedule_to_close))
else:
activities = list(
activity.find_uncomplete_activities(
Expand Down
34 changes: 34 additions & 0 deletions tests/test_activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,40 @@ def test_instances_creation(monkeypatch, generators):
assert not instances[0].context


def test_activity_timeouts(monkeypatch, generators):
"""Test the creation of an activity timeouts.
More details: the timeout of a task is 120s, the schedule to start is 1000,
100 activities are going to be scheduled when the generator is set. The
schedule_to_start for all activities instance is: 10000 * 100 = 100k. The
schedule to close is 100k + duration of an activity (which is 120s * 2).
"""

timeout = 120
start_timeout = 1000

@task.decorate(timeout=timeout)
def local_task():
return

monkeypatch.setattr(activity.Activity, '__init__', lambda self: None)
current_activity = activity.Activity()
current_activity.hydrate(dict(schedule_to_start=start_timeout))
current_activity.generators = generators
current_activity.runner = runner.Sync(
local_task.fill(),
local_task.fill())

total_generators = pow(10, len(current_activity.generators))
for instance in current_activity.instances({}):
schedule_to_start = start_timeout * total_generators
assert current_activity.pool_size == total_generators
assert current_activity.schedule_to_start == schedule_to_start
assert current_activity.timeout == str(timeout * 2)
assert current_activity.schedule_to_close == (
schedule_to_start + int(current_activity.timeout))


def test_worker_run(monkeypatch):
"""Test running the worker.
"""
Expand Down

0 comments on commit 801e431

Please sign in to comment.