diff --git a/garcon/context.py b/garcon/context.py new file mode 100644 index 0000000..cdfd0d1 --- /dev/null +++ b/garcon/context.py @@ -0,0 +1,104 @@ +# -*- coding: utf-8 -*- +""" +Context +======= + +Context carries information that have been retrieved from the different SWF +events of an execution. +""" + +import json + + +class ExecutionContext: + + def __init__(self, events=None): + """Create the execution context. + + An execution context gathers the execution input and the result of all + the activities that have successfully ran. It also adds the execution + input into the mix (for logger purposes). + + Args: + events (list): optional list of all the events. + """ + + self.current = {} + self.workflow_input = {} + + if events: + for event in events: + self.add(event) + + def add(self, event): + """Add an event into the execution context. + + The events are the ones coming from SWF directly (so the fields are the + ones we expect). + + Args: + event (dict): the event to add to the context. + """ + + event_type = event.get('eventType') + if event_type == 'ActivityTaskCompleted': + self.add_activity_result(event) + elif event_type == 'WorkflowExecutionStarted': + self.set_execution_input(event) + + def set_workflow_execution_info(self, execution_info, domain): + """Add the workflow execution info. + + Workflow execution info contains the domain, workflow id and run id. + This allows the logger to properly namespace the messages and + facilitate debugging. + + Args: + execution_info (dict): the execution information. + domain (str): the current domain + """ + + if ('workflowExecution' in execution_info and + 'workflowId' in execution_info['workflowExecution'] and + 'runId' in execution_info['workflowExecution']): + + workflow_execution = execution_info['workflowExecution'] + self.current.update({ + 'execution.domain': domain, + 'execution.workflow_id': workflow_execution['workflowId'], + 'execution.run_id': workflow_execution['runId'] + }) + + def set_execution_input(self, execution_event): + """Add the workflow execution input. + + Please note the input within the execution event should always be a + json string. + + Args: + execution_event (str): the execution event information. + """ + + attributes = execution_event['workflowExecutionStartedEventAttributes'] + result = attributes.get('input') + if result: + result = json.loads(result) + self.workflow_input = result + self.current.update(result) + + def add_activity_result(self, activity_event): + """Add an activity result. + + Please note: the result of an activity event should always be a json + string. + + Args: + activity_event (str): json object that represents the activity + information. + """ + + attributes = activity_event['activityTaskCompletedEventAttributes'] + result = attributes.get('result') + + if result: + self.current.update(json.loads(result)) diff --git a/garcon/decider.py b/garcon/decider.py index 8a33941..6db3184 100755 --- a/garcon/decider.py +++ b/garcon/decider.py @@ -61,29 +61,6 @@ def get_history(self, poll): # Remove all the events that are related to decisions and only. return [e for e in events if not e['eventType'].startswith('Decision')] - def get_workflow_execution_info(self, poll): - """Get the workflow execution info from a given poll if it exists. - - Args: - poll (object): The poll object (see AWS SWF for details.) - Return: - `dict`: Workflow execution info including domain, workflowId and - runId. - """ - - execution_info = None - if ('workflowExecution' in poll and 'workflowId' in - poll['workflowExecution'] and 'runId' in - poll['workflowExecution']): - - workflow_execution = poll['workflowExecution'] - execution_info = { - 'execution.domain': self.domain, - 'execution.workflow_id': workflow_execution['workflowId'], - 'execution.run_id': workflow_execution['runId'] - } - - return execution_info def get_activity_states(self, history): """Get the activity states from the history. @@ -146,14 +123,14 @@ def create_decisions_from_flow(self, decisions, activity_states, context): try: for current in activity.find_available_activities( - self.flow, activity_states, context): + self.flow, activity_states, context.current): schedule_activity_task( decisions, current, version=self.version) else: activities = list( activity.find_uncomplete_activities( - self.flow, activity_states, context)) + self.flow, activity_states, context.current)) if not activities: decisions.complete_workflow_execution() except Exception as e: @@ -178,11 +155,17 @@ def delegate_decisions(self, decisions, decider, history, context): schedule_context = ScheduleContext() decider_schedule = functools.partial( - schedule, decisions, schedule_context, history, context, + schedule, decisions, schedule_context, history, context.current, version=self.version) try: - decider(schedule=decider_schedule) + kwargs = dict(schedule=decider_schedule) + + # retro-compatibility. + if 'context' in decider.__code__.co_varnames: + kwargs.update(context=context.workflow_input) + + decider(**kwargs) # When no exceptions are raised and the method decider has returned # it means that there i nothing left to do in the current decider. @@ -221,19 +204,16 @@ def run(self): history = self.get_history(poll) activity_states = self.get_activity_states(history) - workflow_execution_info = self.get_workflow_execution_info(poll) - context = event.get_current_context(history) - - if workflow_execution_info is not None: - context.update(workflow_execution_info) + current_context = event.get_current_context(history) + current_context.set_workflow_execution_info(poll, self.domain) decisions = swf.Layer1Decisions() if not custom_decider: self.create_decisions_from_flow( - decisions, activity_states, context) + decisions, activity_states, current_context) else: self.delegate_decisions( - decisions, custom_decider, activity_states, context) + decisions, custom_decider, activity_states, current_context) self.complete(decisions=decisions) return True diff --git a/garcon/event.py b/garcon/event.py index a6c854d..e536ebe 100755 --- a/garcon/event.py +++ b/garcon/event.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- from garcon import activity +from garcon import context import json @@ -90,21 +91,5 @@ def get_current_context(events): """ events = sorted(events, key=lambda item: item.get('eventId')) - context = {} - - for event in events: - event_type = event.get('eventType') - result = None - - if event_type == 'ActivityTaskCompleted': - attributes = event['activityTaskCompletedEventAttributes'] - result = attributes.get('result') - - if event_type == 'WorkflowExecutionStarted': - attributes = event['workflowExecutionStartedEventAttributes'] - result = attributes.get('input') - - if result: - context.update(json.loads(result)) - - return context + execution_context = context.ExecutionContext(events) + return execution_context diff --git a/tests/fixtures/decider.py b/tests/fixtures/decider.py index 68254b5..33f6c10 100644 --- a/tests/fixtures/decider.py +++ b/tests/fixtures/decider.py @@ -121,7 +121,7 @@ 'eventTimestamp': 1419351026.803, 'eventType': 'DecisionTaskCompleted'}, {'activityTaskCompletedEventAttributes': { - 'scheduledEventId': 11, 'startedEventId': 16}, + 'scheduledEventId': 11, 'startedEventId': 16, 'result': '{"k": "v"}'}, 'eventId': 19, 'eventTimestamp': 1419351026.932, 'eventType': 'ActivityTaskCompleted'}, diff --git a/tests/test_context.py b/tests/test_context.py new file mode 100644 index 0000000..96a8429 --- /dev/null +++ b/tests/test_context.py @@ -0,0 +1,53 @@ +from __future__ import absolute_import +try: + from unittest.mock import MagicMock +except: + from mock import MagicMock +import boto.swf.layer2 as swf + +from garcon import context +from tests.fixtures import decider as decider_events + + +def mock(monkeypatch): + for base in [swf.Decider, swf.WorkflowType, swf.ActivityType, swf.Domain]: + monkeypatch.setattr(base, '__init__', MagicMock(return_value=None)) + if base is not swf.Decider: + monkeypatch.setattr(base, 'register', MagicMock()) + + +def test_context_creation_without_events(monkeypatch): + """Check the basic context creation. + """ + + mock(monkeypatch) + current_context = context.ExecutionContext() + assert not current_context.current + assert not current_context.workflow_input + + +def test_context_creation_with_events(monkeypatch): + """Test context creation with events. + """ + + mock(monkeypatch) + from tests.fixtures import decider as poll + + current_context = context.ExecutionContext(poll.history.get('events')) + assert current_context.current == {'k': 'v'} + +def test_get_workflow_execution_info(monkeypatch): + """Check that the workflow execution info are properly extracted + """ + + mock(monkeypatch) + from tests.fixtures import decider as poll + + current_context = context.ExecutionContext() + current_context.set_workflow_execution_info(poll.history, 'dev') + + # Test extracting workflow execution info + assert current_context.current == { + 'execution.domain': 'dev', + 'execution.run_id': '123abc=', + 'execution.workflow_id': 'test-workflow-id'} diff --git a/tests/test_decider.py b/tests/test_decider.py index fd22b60..50b9b2d 100755 --- a/tests/test_decider.py +++ b/tests/test_decider.py @@ -41,25 +41,6 @@ def test_create_decider(monkeypatch): assert not dec.register.called -def test_get_workflow_execution_info(monkeypatch): - """Check that the workflow execution info are properly extracted - """ - - mock(monkeypatch) - from tests.fixtures.flows import example - from tests.fixtures import decider as poll - - d = decider.DeciderWorker(example) - - # Test extracting workflow execution info - assert d.get_workflow_execution_info(poll.history) == { - 'execution.domain': 'dev', - 'execution.run_id': '123abc=', - 'execution.workflow_id': 'test-workflow-id'} - - assert d.get_workflow_execution_info({}) is None - - def test_get_history(monkeypatch): """Test the decider history """