Skip to content

Commit

Permalink
Merge 5034559 into c10f4aa
Browse files Browse the repository at this point in the history
  • Loading branch information
xethorn committed Feb 19, 2015
2 parents c10f4aa + 5034559 commit 3bbf136
Show file tree
Hide file tree
Showing 10 changed files with 163 additions and 18 deletions.
32 changes: 32 additions & 0 deletions garcon/activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@

from garcon import log
from garcon import utils
from garcon import runner


ACTIVITY_STANDBY = 0
Expand Down Expand Up @@ -111,6 +112,37 @@ def id(self):
name=self.activity_name,
id=activity_id)

def create_execution_input(self, context):
"""Create the input of the activity from the context.
AWS has a limit on the number of characters that can be used (32k). If
you use the `task.decorate`, the data sent to the activity is optimized
to match the values of the context.
Args:
context (dict): the current execution context (which is different
from the activity context.)
Return:
dict: the input to send to the activity.
"""

activity_input = dict()
context = dict(list(context.items()) + list(self.context.items()))

try:
if not getattr(self.activity_worker, 'runner', None):
raise runner.NoRunnerRequirementsFound()

for requirement in self.activity_worker.runner.requirements:
value = context.get(requirement)
if value:
activity_input.update({requirement: value})

except runner.NoRunnerRequirementsFound:
return context
return activity_input


class Activity(swf.ActivityWorker, log.GarconLogger):
version = '1.0'
Expand Down
13 changes: 4 additions & 9 deletions garcon/decider.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,8 @@

from boto.swf.exceptions import SWFDomainAlreadyExistsError
from boto.swf.exceptions import SWFTypeAlreadyExistsError
from threading import Thread
import boto.swf.layer2 as swf
import json
import time

from garcon import activity
from garcon import event
Expand Down Expand Up @@ -117,13 +115,13 @@ def register(self):
version=self.version,
task_list=self.task_list))

for activity in self.activities:
for current_activity in self.activities:
registerables.append(
swf.ActivityType(
domain=self.domain,
name=activity.name,
name=current_activity.name,
version=self.version,
task_list=activity.task_list))
task_list=current_activity.task_list))

for swf_entity in registerables:
try:
Expand Down Expand Up @@ -170,15 +168,12 @@ def run(self):
for current in activity.find_available_activities(
self.flow, activity_states, context):

local_activity_context = dict(
context.items() | current.context.items())

decisions.schedule_activity_task(
current.id, # activity id.
current.activity_name,
self.version,
task_list=current.activity_worker.task_list,
input=json.dumps(local_activity_context),
input=json.dumps(current.create_execution_input(context)),
start_to_close_timeout=current.activity_worker.timeout)
else:
activities = list(
Expand Down
33 changes: 32 additions & 1 deletion garcon/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@
DEFAULT_TASK_TIMEOUT = 600 # 10 minutes.


class NoRunnerRequirementsFound(Exception):
pass


class BaseRunner():

def __init__(self, *args):
Expand Down Expand Up @@ -45,11 +49,38 @@ def timeout(self):

return str(timeout)

@property
def requirements(self):
"""Find all the requirements from the list of tasks and return it.
If a task does not use the `task.decorate`, no assumptions can be made
on which values from the context will be used, and it will raise an
exception.
Raise:
NoRequirementFound: The exception when no requirements have been
mentioned in at least one or more tasks.
Return:
set: the list of the required values from the context.
"""

requirements = []

for task in self.tasks:
task_details = getattr(task, '__garcon__', None)
if task_details:
requirements += task_details.get('requirements', [])
else:
raise NoRunnerRequirementsFound()

return set(requirements)

def execute(self, activity, context):
"""Execution of the tasks.
"""

return NotImplementedError
raise NotImplementedError


class Sync(BaseRunner):
Expand Down
4 changes: 4 additions & 0 deletions garcon/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,10 @@ def wrapper(context, **kwargs):
return response

return namespace_result(response, namespace)

# Keep a record of the requirements value. This allows us to trim the
# size of the context sent to the activity as an input.
_decorate(wrapper, 'requirements', requirements.values())
return wrapper

fn.fill = fill
Expand Down
2 changes: 0 additions & 2 deletions tests/fixtures/flows/example.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
from __future__ import print_function
import boto.swf.layer2 as swf

from garcon import activity
from garcon import task
from garcon import runner


Expand Down
2 changes: 0 additions & 2 deletions tests/fixtures/log.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
import pytest

from garcon import log


Expand Down
54 changes: 54 additions & 0 deletions tests/test_activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -391,3 +391,57 @@ def test_create_activity_instance_id_with_local_context(monkeypatch):

assert instance.id.startswith(activity_mock.name)
assert utils.create_dictionary_key.called


def test_create_activity_instance_input_without_runner(monkeypatch):
"""Test the creation of a context for an activity instance input without
specifying a runner.
"""

activity_mock = MagicMock()
activity_mock.name = 'activity'
activity_mock.runner = None
context = dict(context='yes')
instance = activity.ActivityInstance(activity_mock, context)
resp = instance.create_execution_input(dict())

assert len(resp) == 1
assert resp.get('context') == 'yes'


def test_create_activity_instance_input(monkeypatch):
"""Test the creation of a context for an activity instance input.
"""

@task.decorate()
def task_a(value):
pass

activity_mock = MagicMock()
activity_mock.name = 'activity'
activity_mock.runner = runner.BaseRunner(task_a.fill(value='context'))
instance = activity.ActivityInstance(activity_mock, dict(context='yes'))
resp = instance.create_execution_input(dict(somemore='values'))

assert len(resp) == 1
assert resp.get('context') == 'yes'


def test_create_activity_instance_input_without_decorate(monkeypatch):
"""Test the creation of a context input without the use of a decorator.
"""

def task_a(value):
pass

activity_mock = MagicMock()
activity_mock.name = 'activity'
context = dict(foo='bar')
local_context = dict(context='yes')

activity_mock.runner = runner.BaseRunner(task_a)
instance = activity.ActivityInstance(activity_mock, local_context)

resp = instance.create_execution_input(context)
assert resp.get('foo') == 'bar'
assert resp.get('context') == 'yes'
3 changes: 0 additions & 3 deletions tests/test_decider.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,7 @@
except:
from mock import MagicMock
import boto.swf.layer2 as swf
from boto.swf import layer1
import pytest

from garcon import activity
from garcon import decider


Expand Down
1 change: 0 additions & 1 deletion tests/test_log.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import logging
import pytest

from garcon import log
from tests.fixtures import log as fixture
Expand Down
37 changes: 37 additions & 0 deletions tests/test_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,16 @@
from garcon import task


def test_execute_default_task_runner():
"""Should throw an exception.
"""

current_runner = runner.BaseRunner()
with pytest.raises(NotImplementedError):
current_runner.execute(None, None)
assert False


def test_synchronous_tasks():
"""Test synchronous tasks.
"""
Expand Down Expand Up @@ -87,3 +97,30 @@ def task_b():

current_runner = runner.BaseRunner(task_a, task_b)
assert current_runner.timeout == str(timeout + runner.DEFAULT_TASK_TIMEOUT)


def test_runner_requirements():
"""Test the requirements for the runner
"""

@task.decorate()
def task_a():
pass

value = 'context.value'
current_runner = runner.BaseRunner(task_a.fill(foo=value))
assert value in current_runner.requirements


def test_runner_requirements_without_decoration():
"""Should just throw an exception.
"""

def task_a():
pass

current_runner = runner.BaseRunner(task_a)

with pytest.raises(runner.NoRunnerRequirementsFound):
current_runner.requirements
assert False

0 comments on commit 3bbf136

Please sign in to comment.