Skip to content

Commit

Permalink
Add heartbeat.
Browse files Browse the repository at this point in the history
Heartbeat is set by default by AWS (to 600 seconds, which is
10 minutes. If an activity last longer than 10 minutes but
does not issue a heartbeat - SWF considers this activity has
failed.

The heartbeat mechanism works on our end at task level. We
gather all the different heartbeats (if one activity misses
the heartbeat, it is set to the SWF default.) – and figure
out which one is the longuest. We use this one as a
reference for our task.

Additional notes:
* Everytime an activity spawns a new task, the heartbeat is
triggered.
* Everytime an activity finishes a task, a heartbeat is
triggered (this is for the async task runner.)

@rantonmattei
  • Loading branch information
xethorn committed Apr 13, 2015
1 parent b62437c commit a38789b
Show file tree
Hide file tree
Showing 8 changed files with 158 additions and 10 deletions.
14 changes: 14 additions & 0 deletions garcon/activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,20 @@ def timeout(self):

return self.runner.timeout

@property
def heartbeat_timeout(self):
"""Return the heartbeat in seconds.
This heartbeat corresponds on when an activity needs to send a signal
to swf that it is still running. This will set the value when the
activity is scheduled.
Return:
int: Task list timeout.
"""

return self.runner.heartbeat


class ActivityWorker():

Expand Down
1 change: 1 addition & 0 deletions garcon/decider.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ def run(self):
self.version,
task_list=current.activity_worker.task_list,
input=json.dumps(current.create_execution_input(context)),
heartbeat_timeout=current.activity_worker.heartbeat_timeout,
start_to_close_timeout=current.activity_worker.timeout)
else:
activities = list(
Expand Down
37 changes: 35 additions & 2 deletions garcon/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@
============
The task runners are responsible for running all the tasks (either in series
or in parallel). There's only one task runner per activity. The base is
or in parallel). There's only one task runner per activity.
"""

from concurrent import futures
from concurrent.futures import ThreadPoolExecutor


DEFAULT_TASK_TIMEOUT = 600 # 10 minutes.
DEFAULT_TASK_HEARTBEAT = 600 # 10 minutes


class NoRunnerRequirementsFound(Exception):
Expand Down Expand Up @@ -49,6 +50,37 @@ def timeout(self):

return str(timeout)

@property
def heartbeat(self):
"""Calculate and return the heartbeat for an activity.
The heartbeat represents when an actvitity should be sending a signal
to SWF that it has not completed yet. The heartbeat is sent everytime
a new task is going to be launched.
Similar to the `BaseRunner.timeout`, the heartbeat is pessimistic, it
looks at the largest heartbeat and set it up.
Return:
str: The heartbeat timeout (boto requires the timeout to be a string
not a regular number.)
"""

heartbeat = 0

for task in self.tasks:
task_details = getattr(task, '__garcon__', None)
task_heartbeat = DEFAULT_TASK_HEARTBEAT

if task_details:
task_heartbeat = task_details.get(
'heartbeat', DEFAULT_TASK_HEARTBEAT)

if task_heartbeat > heartbeat:
heartbeat = task_heartbeat

return str(heartbeat)

@property
def requirements(self):
"""Find all the requirements from the list of tasks and return it.
Expand All @@ -73,7 +105,6 @@ def requirements(self):
requirements += task_details.get('requirements', [])
else:
raise NoRunnerRequirementsFound()

return set(requirements)

def execute(self, activity, context):
Expand All @@ -88,6 +119,7 @@ class Sync(BaseRunner):
def execute(self, activity, context):
result = dict()
for task in self.tasks:
activity.heartbeat()
task_context = dict(list(result.items()) + list(context.items()))
resp = task(task_context, activity=activity)
result.update(resp or dict())
Expand All @@ -108,6 +140,7 @@ def execute(self, activity, context):
tasks.append(executor.submit(task, context, activity=activity))

for future in futures.as_completed(tasks):
activity.heartbeat()
data = future.result()
result.update(data or {})
return result
16 changes: 12 additions & 4 deletions garcon/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@
import copy


def decorate(timeout=None, enable_contextify=True):
def decorate(timeout=None, heartbeat=None, enable_contextify=True):
"""Generic task decorator for tasks.
Args:
timeout (int): The timeout of the task (see timeout).
heartbeat (int): The heartbeat timeout.
contextify (boolean): If the task can be contextified (see contextify).
Return:
callable: The wrapper.
"""
Expand All @@ -35,23 +35,31 @@ def wrapper(fn):
if timeout:
_decorate(fn, 'timeout', timeout)

# If the task does not have a heartbeat, but instead the task has
# a timeout, the heartbeat should be adjusted to the timeout. In
# most case, most people will probably opt for this option.
if heartbeat or timeout:
_decorate(fn, 'heartbeat', heartbeat or timeout)

if enable_contextify:
contextify(fn)

return fn

return wrapper


def timeout(time):
def timeout(time, heartbeat=None):
"""Wrapper for a task to define its timeout.
Args:
time (int): the timeout in seconds
heartbeat (int): the heartbeat timeout (in seconds too.)
"""

def wrapper(fn):
_decorate(fn, 'timeout', time)
if heartbeat:
_decorate(fn, 'heartbeat', heartbeat)
return fn

return wrapper
Expand Down
2 changes: 2 additions & 0 deletions tests/test_activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,8 @@ def test_execute_activity(monkeypatch):
"""

monkeypatch.setattr(activity.Activity, '__init__', lambda self: None)
monkeypatch.setattr(activity.Activity, 'heartbeat', lambda self: None)

resp = dict(task_resp='something')
custom_task = MagicMock(return_value=resp)

Expand Down
1 change: 1 addition & 0 deletions tests/test_decider.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ def test_create_decider(monkeypatch):
dec = decider.DeciderWorker(example, register=False)
assert not dec.register.called


def test_get_workflow_execution_info(monkeypatch):
"""Check that the workflow execution info are properly extracted
"""
Expand Down
68 changes: 64 additions & 4 deletions tests/test_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import pytest

from garcon import activity
from garcon import runner
from garcon import task

Expand All @@ -20,14 +21,20 @@ def test_execute_default_task_runner():
assert False


def test_synchronous_tasks():
def test_synchronous_tasks(monkeypatch):
"""Test synchronous tasks.
"""

monkeypatch.setattr(activity.Activity, '__init__', lambda self: None)
monkeypatch.setattr(activity.Activity, 'heartbeat', lambda self: None)

resp = dict(foo='bar')
current_runner = runner.Sync(
MagicMock(), MagicMock(return_value=resp))
result = current_runner.execute(None, dict())
current_activity = activity.Activity()
current_activity.hydrate(dict(runner=current_runner))

result = current_runner.execute(current_activity, dict())

assert len(current_runner.tasks) == 2

Expand All @@ -37,10 +44,13 @@ def test_synchronous_tasks():
assert resp == result


def test_aynchronous_tasks():
def test_aynchronous_tasks(monkeypatch):
"""Test asynchronous tasks.
"""

monkeypatch.setattr(activity.Activity, '__init__', lambda self: None)
monkeypatch.setattr(activity.Activity, 'heartbeat', lambda self: None)

tasks = [MagicMock() for i in range(5)]
tasks[2].return_value = dict(oi='mondo')
tasks[4].return_value = dict(bonjour='monde')
Expand All @@ -54,8 +64,11 @@ def test_aynchronous_tasks():
assert current_runner.max_workers == workers
assert len(current_runner.tasks) == len(tasks)

current_activity = activity.Activity()
current_activity.hydrate(dict(runner=current_runner))

context = dict(hello='world')
resp = current_runner.execute(None, context)
resp = current_runner.execute(current_activity, context)

for current_task in tasks:
assert current_task.called
Expand All @@ -71,6 +84,14 @@ def test_calculate_timeout_with_no_tasks():
assert task_list.timeout == '0'


def test_calculate_heartbeat_with_no_tasks():
"""Task list without tasks has no heartbeat.
"""

task_list = runner.BaseRunner()
assert task_list.heartbeat == '0'


def test_calculate_default_timeout():
"""Tasks that do not have a set timeout get the default timeout.
"""
Expand All @@ -79,6 +100,14 @@ def test_calculate_default_timeout():
assert task_list.timeout == str(runner.DEFAULT_TASK_TIMEOUT)


def test_calculate_default_heartbeat():
"""Tasks that do not have a set timeout get the default timeout.
"""

task_list = runner.BaseRunner(lambda x: x)
assert task_list.heartbeat == str(runner.DEFAULT_TASK_HEARTBEAT)


def test_calculate_timeout():
"""Check methods that have set timeout.
"""
Expand Down Expand Up @@ -106,6 +135,37 @@ def task_c():
assert current_runner.timeout == str(timeout + runner.DEFAULT_TASK_TIMEOUT)


def test_calculate_heartbeat():
"""Check methods that have set timeout.
"""

@task.decorate(heartbeat=5)
def task_a():
pass

current_runner = runner.BaseRunner(task_a)
assert current_runner.heartbeat == str(5)

@task.decorate(heartbeat=3)
def task_b():
pass

current_runner = runner.BaseRunner(task_b)
assert current_runner.heartbeat == str(3)

@task.decorate(heartbeat=4498)
def task_c():
pass

def task_d():
pass

current_runner = runner.BaseRunner(
task_a, task_b, task_c, task_d)
assert current_runner.heartbeat == str(4498)



def test_runner_requirements():
"""Test the requirements for the runner
"""
Expand Down
29 changes: 29 additions & 0 deletions tests/test_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,21 @@ def test():
assert test.__garcon__.get('timeout') == timeout


def test_timeout_decorator_with_heartbeat():
"""Test the timeout decorator with heartbeat.
"""

timeout = 20
heartbeat = 30

@task.timeout(timeout, heartbeat=heartbeat)
def test():
pass

assert test.__garcon__.get('heartbeat') == heartbeat
assert test.__garcon__.get('timeout') == timeout


def test_decorator():
"""Test the Decorator.
Expand Down Expand Up @@ -88,12 +103,26 @@ def test(user):
assert user is userinfo

assert test.__garcon__.get('timeout') == timeout
assert test.__garcon__.get('heartbeat') == timeout
assert callable(test.fill)

call = test.fill(user='user')
call(dict(user='something'))


def test_task_decorator_with_heartbeat():
"""Test the task decorator with heartbeat.
"""

heartbeat = 50

@task.decorate(heartbeat=heartbeat)
def test(user):
assert user is userinfo

assert test.__garcon__.get('heartbeat') == heartbeat


def test_task_decorator_with_activity():
"""Test the task decorator with an activity.
"""
Expand Down

0 comments on commit a38789b

Please sign in to comment.