Skip to content

Commit

Permalink
Add ExecutionContext
Browse files Browse the repository at this point in the history
The execution context allows to separate the different elements that
forms the full execution context: from the workflow execution input
to the full context.

@rantonmattei
  • Loading branch information
xethorn committed Oct 20, 2015
1 parent c308321 commit d1323af
Show file tree
Hide file tree
Showing 6 changed files with 175 additions and 72 deletions.
104 changes: 104 additions & 0 deletions garcon/context.py
@@ -0,0 +1,104 @@
# -*- coding: utf-8 -*-
"""
Context
=======
Context carries information that have been retrieved from the different SWF
events of an execution.
"""

import json


class ExecutionContext:

def __init__(self, events=None):
"""Create the execution context.
An execution context gathers the execution input and the result of all
the activities that have successfully ran. It also adds the execution
input into the mix (for logger purposes).
Args:
events (list): optional list of all the events.
"""

self.current = {}
self.workflow_input = {}

if events:
for event in events:
self.add(event)

def add(self, event):
"""Add an event into the execution context.
The events are the ones coming from SWF directly (so the fields are the
ones we expect).
Args:
event (dict): the event to add to the context.
"""

event_type = event.get('eventType')
if event_type == 'ActivityTaskCompleted':
self.add_activity_result(event)
elif event_type == 'WorkflowExecutionStarted':
self.set_execution_input(event)

def set_workflow_execution_info(self, execution_info, domain):
"""Add the workflow execution info.
Workflow execution info contains the domain, workflow id and run id.
This allows the logger to properly namespace the messages and
facilitate debugging.
Args:
execution_info (dict): the execution information.
domain (str): the current domain
"""

if ('workflowExecution' in execution_info and
'workflowId' in execution_info['workflowExecution'] and
'runId' in execution_info['workflowExecution']):

workflow_execution = execution_info['workflowExecution']
self.current.update({
'execution.domain': domain,
'execution.workflow_id': workflow_execution['workflowId'],
'execution.run_id': workflow_execution['runId']
})

def set_execution_input(self, execution_event):
"""Add the workflow execution input.
Please note the input within the execution event should always be a
json string.
Args:
execution_event (str): the execution event information.
"""

attributes = execution_event['workflowExecutionStartedEventAttributes']
result = attributes.get('input')
if result:
result = json.loads(result)
self.workflow_input = result
self.current.update(result)

def add_activity_result(self, activity_event):
"""Add an activity result.
Please note: the result of an activity event should always be a json
string.
Args:
activity_event (str): json object that represents the activity
information.
"""

attributes = activity_event['activityTaskCompletedEventAttributes']
result = attributes.get('result')

if result:
self.current.update(json.loads(result))
48 changes: 14 additions & 34 deletions garcon/decider.py
Expand Up @@ -61,29 +61,6 @@ def get_history(self, poll):
# Remove all the events that are related to decisions and only.
return [e for e in events if not e['eventType'].startswith('Decision')]

def get_workflow_execution_info(self, poll):
"""Get the workflow execution info from a given poll if it exists.
Args:
poll (object): The poll object (see AWS SWF for details.)
Return:
`dict`: Workflow execution info including domain, workflowId and
runId.
"""

execution_info = None
if ('workflowExecution' in poll and 'workflowId' in
poll['workflowExecution'] and 'runId' in
poll['workflowExecution']):

workflow_execution = poll['workflowExecution']
execution_info = {
'execution.domain': self.domain,
'execution.workflow_id': workflow_execution['workflowId'],
'execution.run_id': workflow_execution['runId']
}

return execution_info

def get_activity_states(self, history):
"""Get the activity states from the history.
Expand Down Expand Up @@ -146,14 +123,14 @@ def create_decisions_from_flow(self, decisions, activity_states, context):

try:
for current in activity.find_available_activities(
self.flow, activity_states, context):
self.flow, activity_states, context.current):

schedule_activity_task(
decisions, current, version=self.version)
else:
activities = list(
activity.find_uncomplete_activities(
self.flow, activity_states, context))
self.flow, activity_states, context.current))
if not activities:
decisions.complete_workflow_execution()
except Exception as e:
Expand All @@ -178,11 +155,17 @@ def delegate_decisions(self, decisions, decider, history, context):

schedule_context = ScheduleContext()
decider_schedule = functools.partial(
schedule, decisions, schedule_context, history, context,
schedule, decisions, schedule_context, history, context.current,
version=self.version)

try:
decider(schedule=decider_schedule)
kwargs = dict(schedule=decider_schedule)

# retro-compatibility.
if 'context' in decider.__code__.co_varnames:
kwargs.update(context=context.workflow_input)

decider(**kwargs)

# When no exceptions are raised and the method decider has returned
# it means that there i nothing left to do in the current decider.
Expand Down Expand Up @@ -221,19 +204,16 @@ def run(self):

history = self.get_history(poll)
activity_states = self.get_activity_states(history)
workflow_execution_info = self.get_workflow_execution_info(poll)
context = event.get_current_context(history)

if workflow_execution_info is not None:
context.update(workflow_execution_info)
current_context = event.get_current_context(history)
current_context.set_workflow_execution_info(poll, self.domain)

decisions = swf.Layer1Decisions()
if not custom_decider:
self.create_decisions_from_flow(
decisions, activity_states, context)
decisions, activity_states, current_context)
else:
self.delegate_decisions(
decisions, custom_decider, activity_states, context)
decisions, custom_decider, activity_states, current_context)
self.complete(decisions=decisions)
return True

Expand Down
21 changes: 3 additions & 18 deletions garcon/event.py
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
from garcon import activity
from garcon import context
import json


Expand Down Expand Up @@ -90,21 +91,5 @@ def get_current_context(events):
"""

events = sorted(events, key=lambda item: item.get('eventId'))
context = {}

for event in events:
event_type = event.get('eventType')
result = None

if event_type == 'ActivityTaskCompleted':
attributes = event['activityTaskCompletedEventAttributes']
result = attributes.get('result')

if event_type == 'WorkflowExecutionStarted':
attributes = event['workflowExecutionStartedEventAttributes']
result = attributes.get('input')

if result:
context.update(json.loads(result))

return context
execution_context = context.ExecutionContext(events)
return execution_context
2 changes: 1 addition & 1 deletion tests/fixtures/decider.py
Expand Up @@ -121,7 +121,7 @@
'eventTimestamp': 1419351026.803,
'eventType': 'DecisionTaskCompleted'},
{'activityTaskCompletedEventAttributes': {
'scheduledEventId': 11, 'startedEventId': 16},
'scheduledEventId': 11, 'startedEventId': 16, 'result': '{"k": "v"}'},
'eventId': 19,
'eventTimestamp': 1419351026.932,
'eventType': 'ActivityTaskCompleted'},
Expand Down
53 changes: 53 additions & 0 deletions tests/test_context.py
@@ -0,0 +1,53 @@
from __future__ import absolute_import
try:
from unittest.mock import MagicMock
except:
from mock import MagicMock
import boto.swf.layer2 as swf

from garcon import context
from tests.fixtures import decider as decider_events


def mock(monkeypatch):
for base in [swf.Decider, swf.WorkflowType, swf.ActivityType, swf.Domain]:
monkeypatch.setattr(base, '__init__', MagicMock(return_value=None))
if base is not swf.Decider:
monkeypatch.setattr(base, 'register', MagicMock())


def test_context_creation_without_events(monkeypatch):
"""Check the basic context creation.
"""

mock(monkeypatch)
current_context = context.ExecutionContext()
assert not current_context.current
assert not current_context.workflow_input


def test_context_creation_with_events(monkeypatch):
"""Test context creation with events.
"""

mock(monkeypatch)
from tests.fixtures import decider as poll

current_context = context.ExecutionContext(poll.history.get('events'))
assert current_context.current == {'k': 'v'}

def test_get_workflow_execution_info(monkeypatch):
"""Check that the workflow execution info are properly extracted
"""

mock(monkeypatch)
from tests.fixtures import decider as poll

current_context = context.ExecutionContext()
current_context.set_workflow_execution_info(poll.history, 'dev')

# Test extracting workflow execution info
assert current_context.current == {
'execution.domain': 'dev',
'execution.run_id': '123abc=',
'execution.workflow_id': 'test-workflow-id'}
19 changes: 0 additions & 19 deletions tests/test_decider.py
Expand Up @@ -41,25 +41,6 @@ def test_create_decider(monkeypatch):
assert not dec.register.called


def test_get_workflow_execution_info(monkeypatch):
"""Check that the workflow execution info are properly extracted
"""

mock(monkeypatch)
from tests.fixtures.flows import example
from tests.fixtures import decider as poll

d = decider.DeciderWorker(example)

# Test extracting workflow execution info
assert d.get_workflow_execution_info(poll.history) == {
'execution.domain': 'dev',
'execution.run_id': '123abc=',
'execution.workflow_id': 'test-workflow-id'}

assert d.get_workflow_execution_info({}) is None


def test_get_history(monkeypatch):
"""Test the decider history
"""
Expand Down

0 comments on commit d1323af

Please sign in to comment.