Skip to content

Commit

Permalink
Fix instance context.
Browse files Browse the repository at this point in the history
The issue:

   The instance context is only representing the generator context.
   If this context is passed to the task.list, this context will appear
   empty.

Fix:

   Provide the execution context gathered by the decider to fill the
   necessary information

@rantonmattei
  • Loading branch information
xethorn committed Apr 28, 2015
1 parent 03a0dca commit 4b87f31
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 29 deletions.
40 changes: 21 additions & 19 deletions garcon/activity.py
Expand Up @@ -57,7 +57,8 @@

class ActivityInstance:

def __init__(self, activity_worker, context=None):
def __init__(
self, activity_worker, local_context=None, execution_context=None):
"""Activity Instance.
In SWF, Activity is a worker: it will get information from the context,
Expand All @@ -69,14 +70,20 @@ def __init__(self, activity_worker, context=None):
Args:
activity_worker (ActivityWorker): The activity worker that owns
this specific Activity Instance.
context (dict): the local context of the activity (it does not
include the execution context.) Most times the context will be
empty since it is only filled with data that comes from the
local_context (dict): the local context of the activity (it does
not include the execution context.) Most times the context will
be empty since it is only filled with data that comes from the
generators.
execution_context (dict): the execution context of when an activity
will be scheduled with.
"""

self.activity_worker = activity_worker
self.context = context or dict()
self.execution_context = execution_context or dict()
self.local_context = local_context or dict()
self.global_context = dict(
list(self.execution_context.items()) +
list(self.local_context.items()))

@property
def activity_name(self):
Expand Down Expand Up @@ -105,10 +112,10 @@ def id(self):
id.
"""

if not self.context:
if not self.local_context:
activity_id = 1
else:
activity_id = utils.create_dictionary_key(self.context)
activity_id = utils.create_dictionary_key(self.local_context)

return '{name}-{id}'.format(
name=self.activity_name,
Expand Down Expand Up @@ -158,7 +165,7 @@ def timeout(self):
int: Task list timeout.
"""

return self.runner.timeout(self.context)
return self.runner.timeout(self.global_context)

@property
def heartbeat_timeout(self):
Expand All @@ -172,7 +179,7 @@ def heartbeat_timeout(self):
int: Task list timeout.
"""

return self.runner.heartbeat(self.context)
return self.runner.heartbeat(self.global_context)

@property
def runner(self):
Expand All @@ -192,32 +199,27 @@ def runner(self):
raise runner.RunnerMissing()
return activity_runner

def create_execution_input(self, context):
def create_execution_input(self):
"""Create the input of the activity from the context.
AWS has a limit on the number of characters that can be used (32k). If
you use the `task.decorate`, the data sent to the activity is optimized
to match the values of the context.
Args:
context (dict): the current execution context (which is different
from the activity context.)
Return:
dict: the input to send to the activity.
"""

activity_input = dict()
context = dict(list(context.items()) + list(self.context.items()))

try:
for requirement in self.runner.requirements(self.context):
value = context.get(requirement)
for requirement in self.runner.requirements(self.global_context):
value = self.global_context.get(requirement)
if value:
activity_input.update({requirement: value})

except runner.NoRunnerRequirementsFound:
return context
return self.global_context
return activity_input


Expand Down Expand Up @@ -334,7 +336,7 @@ def instances(self, context):
instance_context.update(current_generator_context.items())

yield ActivityInstance(
self, context=instance_context)
self, execution_context=context, local_context=instance_context)


class ActivityWorker():
Expand Down
2 changes: 1 addition & 1 deletion garcon/decider.py
Expand Up @@ -172,7 +172,7 @@ def run(self):
current.activity_name,
self.version,
task_list=current.activity_worker.task_list,
input=json.dumps(current.create_execution_input(context)),
input=json.dumps(current.create_execution_input()),
heartbeat_timeout=str(current.heartbeat_timeout),
start_to_close_timeout=str(current.timeout),
schedule_to_start_timeout=str(current.schedule_to_start),
Expand Down
22 changes: 13 additions & 9 deletions tests/test_activity.py
Expand Up @@ -203,16 +203,16 @@ def test_instances_creation(monkeypatch, generators):
instances = list(current_activity.instances(dict()))
assert len(instances) == pow(10, len(generators))
for instance in instances:
assert isinstance(instance.context.get('i'), int)
assert isinstance(instance.local_context.get('i'), int)

if len(generators) == 2:
assert isinstance(instance.context.get('d'), int)
assert isinstance(instance.local_context.get('d'), int)
else:
instances = list(current_activity.instances(dict()))
assert len(instances) == 1
assert isinstance(instances[0].context, dict)
assert isinstance(instances[0].local_context, dict)
# Context is empty since no generator was used.
assert not instances[0].context
assert not instances[0].local_context


def test_activity_timeouts(monkeypatch, generators):
Expand Down Expand Up @@ -441,7 +441,7 @@ def test_create_activity_instance_input_without_runner(monkeypatch):
instance = activity.ActivityInstance(activity_mock, context)

with pytest.raises(runner.RunnerMissing):
instance.create_execution_input(dict())
instance.create_execution_input()


def test_create_activity_instance_input(monkeypatch):
Expand All @@ -455,8 +455,10 @@ def task_a(value):
activity_mock = MagicMock()
activity_mock.name = 'activity'
activity_mock.runner = runner.BaseRunner(task_a.fill(value='context'))
instance = activity.ActivityInstance(activity_mock, dict(context='yes'))
resp = instance.create_execution_input(dict(somemore='values'))
instance = activity.ActivityInstance(
activity_mock, local_context=dict(context='yes'),
execution_context=dict(somemore='values'))
resp = instance.create_execution_input()

assert len(resp) == 1
assert resp.get('context') == 'yes'
Expand All @@ -475,8 +477,10 @@ def task_a(value):
local_context = dict(context='yes')

activity_mock.runner = runner.BaseRunner(task_a)
instance = activity.ActivityInstance(activity_mock, local_context)
instance = activity.ActivityInstance(
activity_mock, local_context=local_context,
execution_context=context)

resp = instance.create_execution_input(context)
resp = instance.create_execution_input()
assert resp.get('foo') == 'bar'
assert resp.get('context') == 'yes'

0 comments on commit 4b87f31

Please sign in to comment.