Skip to content

Commit

Permalink
External Activities
Browse files Browse the repository at this point in the history
SWF allows any activities to be written in any language (polyglot.)
Garcon for now was locking all the activities to be written in python,
this change allows activities to be external workers (written in any
language.)
  • Loading branch information
xethorn committed May 11, 2015
1 parent d7bb30f commit 1ba2ee3
Show file tree
Hide file tree
Showing 3 changed files with 150 additions and 15 deletions.
36 changes: 36 additions & 0 deletions garcon/activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,36 @@ def instances(self, context):
self, execution_context=context, local_context=instance_context)


class ExternalActivity(Activity):
"""External activity
One of the main advantages of SWF is the ability to write a workflow that
has activities written in any languages. The external activity class allows
to write the workflow in Garcon and benefit from some features (timeout
calculation among other things, sending context data.)
"""

def __init__(self, timeout=None, heartbeat=None):
"""Create the External Activity.
Args:
timeout (int): activity timeout in seconds (mandatory)
heartbeat (int): heartbeat timeout in seconds, if not defined, it will
be equal to the timeout.
"""

self.runner = runner.External(timeout=timeout, heartbeat=heartbeat)

def run(self):
"""Run the external activity.
This activity is handled outside, so the run method should remain
unimplemented and return False (so the run loop stops.)
"""

return False


class ActivityWorker():

def __init__(self, flow, activities=None):
Expand Down Expand Up @@ -397,6 +427,12 @@ def create(domain, name):

def wrapper(**options):
activity = Activity()

if options.get('external'):
activity = ExternalActivity(
timeout=options.get('timeout'),
heartbeat=options.get('heartbeat'))

activity_name = '{name}_{activity}'.format(
name=name,
activity=options.get('name'))
Expand Down
17 changes: 17 additions & 0 deletions garcon/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,3 +149,20 @@ def execute(self, activity, context):
data = future.result()
result.update(data or {})
return result


class External(BaseRunner):

def __init__(self, timeout=None, heartbeat=None):
"""Create the External Runner.
Args:
timeout (int): activity timeout in seconds (mandatory)
heartbeat (int): heartbeat timeout in seconds, if not defined, it will
be equal to the timeout.
"""

assert timeout, 'External runner requires a timeout.'

self.timeout = lambda ctx=None: timeout
self.heartbeat = lambda ctx=None: (heartbeat or timeout)
112 changes: 97 additions & 15 deletions tests/test_activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,43 @@ def test_hydrate_activity(monkeypatch):
tasks=[lambda: dict('val')]))


def test_create_activity(monkeypatch):
"""Test the creation of an activity via `create`.
"""

monkeypatch.setattr(activity.Activity, '__init__', lambda self: None)
create = activity.create('domain_name', 'flow_name')

current_activity = create(name='activity_name')
assert isinstance(current_activity, activity.Activity)
assert current_activity.name == 'flow_name_activity_name'
assert current_activity.task_list == 'flow_name_activity_name'
assert current_activity.domain == 'domain_name'


def test_create_external_activity(monkeypatch):
"""Test the creation of an external activity via `create`.
"""

monkeypatch.setattr(activity.Activity, '__init__', lambda self: None)
create = activity.create('domain_name', 'flow_name')

current_activity = create(
name='activity_name',
timeout=60,
heartbeat=40,
external=True)

assert isinstance(current_activity, activity.ExternalActivity)
assert current_activity.name == 'flow_name_activity_name'
assert current_activity.task_list == 'flow_name_activity_name'
assert current_activity.domain == 'domain_name'

assert isinstance(current_activity.runner, runner.External)
assert current_activity.runner.heartbeat() == 40
assert current_activity.runner.timeout() == 60


def test_create_activity_worker(monkeypatch):
"""Test the creation of an activity worker.
"""
Expand All @@ -196,23 +233,26 @@ def test_instances_creation(monkeypatch, generators):

monkeypatch.setattr(activity.Activity, '__init__', lambda self: None)

current_activity = activity.Activity()
current_activity.generators = generators
local_activity = activity.Activity()
external_activity = activity.ExternalActivity(timeout=60)

if len(current_activity.generators):
instances = list(current_activity.instances(dict()))
assert len(instances) == pow(10, len(generators))
for instance in instances:
assert isinstance(instance.local_context.get('i'), int)
for current_activity in [local_activity, external_activity]:
current_activity.generators = generators

if len(generators) == 2:
assert isinstance(instance.local_context.get('d'), int)
else:
instances = list(current_activity.instances(dict()))
assert len(instances) == 1
assert isinstance(instances[0].local_context, dict)
# Context is empty since no generator was used.
assert not instances[0].local_context
if len(current_activity.generators):
instances = list(current_activity.instances(dict()))
assert len(instances) == pow(10, len(generators))
for instance in instances:
assert isinstance(instance.local_context.get('i'), int)

if len(generators) == 2:
assert isinstance(instance.local_context.get('d'), int)
else:
instances = list(current_activity.instances(dict()))
assert len(instances) == 1
assert isinstance(instances[0].local_context, dict)
# Context is empty since no generator was used.
assert not instances[0].local_context


def test_activity_timeouts(monkeypatch, generators):
Expand Down Expand Up @@ -249,6 +289,28 @@ def local_task():
schedule_to_start + instance.timeout)


def test_external_activity_timeouts(monkeypatch, generators):
"""Test the creation of an external activity timeouts.
"""

timeout = 120
start_timeout = 1000

monkeypatch.setattr(activity.Activity, '__init__', lambda self: None)
current_activity = activity.ExternalActivity(timeout=timeout)
current_activity.hydrate(dict(schedule_to_start=start_timeout))
current_activity.generators = generators

total_generators = pow(10, len(current_activity.generators))
schedule_to_start = start_timeout * total_generators
for instance in current_activity.instances({}):
assert current_activity.pool_size == total_generators
assert instance.schedule_to_start == schedule_to_start
assert instance.timeout == timeout
assert instance.schedule_to_close == (
schedule_to_start + instance.timeout)


def test_worker_run(monkeypatch):
"""Test running the worker.
"""
Expand Down Expand Up @@ -317,6 +379,26 @@ def run(self):
assert spy.call_count == 5


def test_worker_infinite_loop_on_external(monkeypatch):
"""There is no worker for external activities.
"""

external_activity = activity.ExternalActivity(timeout=10)
current_run = external_activity.run
spy = MagicMock()

def run():
spy()
return current_run()

monkeypatch.setattr(external_activity, 'run', run)
activity.worker_runner(external_activity)

# This test might not fail, but it will hang the test suite since it is
# going to trigger an infinite loop.
assert spy.call_count == 1


def test_activity_launch_sequence():
"""Test available activities.
"""
Expand Down

0 comments on commit 1ba2ee3

Please sign in to comment.