Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Make some improvements for "layer1" SWF

 * Treat AWS/SWF response faults as Python exceptions:
   * Add SWFReponseError to boto.exceptions as base exception
     class for SWF faults.
   * Added boto.swf.exceptions module to hold exception classes
     for select SWF faults.
   * Make faults raise exceptions in boto.swf.layer1 make_request().

 * Make more service calls work:
   * Make register_workflow_type() functional.
   * Handle workflow_id filter in list_closed_workflow_executions().
   * Make list_open_workflow_executions() functional.
  • Loading branch information...
commit c857673615dcf296c3abd34b5da633e970f7efc9 1 parent bdddbbd
@eckamm eckamm authored
Showing with 160 additions and 31 deletions.
  1. +26 −0 boto/exception.py
  2. +37 −0 boto/swf/exceptions.py
  3. +97 −31 boto/swf/layer1.py
View
26 boto/exception.py
@@ -322,6 +322,32 @@ def __init__(self, status, reason, body=None, *args):
if self.error_code:
self.error_code = self.error_code.split('#')[-1]
+
+class SWFResponseError(BotoServerError):
+ """
+ This exception expects the fully parsed and decoded JSON response
+ body to be passed as the body parameter.
+
+ :ivar status: The HTTP status code.
+ :ivar reason: The HTTP reason message.
+ :ivar body: The Python dict that represents the decoded JSON
+ response body.
+ :ivar error_message: The full description of the AWS error encountered.
+ :ivar error_code: A short string that identifies the AWS error
+ (e.g. ConditionalCheckFailedException)
+ """
+
+ def __init__(self, status, reason, body=None, *args):
+ self.status = status
+ self.reason = reason
+ self.body = body
+ if self.body:
+ self.error_message = self.body.get('message', None)
+ self.error_code = self.body.get('__type', None)
+ if self.error_code:
+ self.error_code = self.error_code.split('#')[-1]
+
+
class EmrResponseError(BotoServerError):
"""
Error in response from EMR
View
37 boto/swf/exceptions.py
@@ -0,0 +1,37 @@
+"""
+Exceptions that are specific to the swf module.
+
+This module subclasses the base SWF response exception,
+boto.exceptions.SWFResponseError, for some of the SWF specific faults.
+"""
+from boto.exception import SWFResponseError
+
+
+class SWFDomainAlreadyExistsError(SWFResponseError):
+ """
+ Raised when when the domain already exists.
+ """
+ pass
+
+
+class SWFLimitExceededError(SWFResponseError):
+ """
+ Raised when when a system imposed limitation has been reached.
+ """
+ pass
+
+
+class SWFOperationNotPermittedError(SWFResponseError):
+ """
+ Raised when (reserved for future use).
+ """
+
+
+class SWFTypeAlreadyExistsError(SWFResponseError):
+ """
+ Raised when when the workflow type or activity type already exists.
+ """
+ pass
+
+
+
View
128 boto/swf/layer1.py
@@ -25,7 +25,8 @@
import boto
from boto.connection import AWSAuthConnection
from boto.provider import Provider
-from boto.exception import DynamoDBResponseError
+from boto.exception import SWFResponseError
+from boto.swf import exceptions as swf_exceptions
import time
try:
@@ -50,8 +51,21 @@ class Layer1(AWSAuthConnection):
ServiceName = 'com.amazonaws.swf.service.model.SimpleWorkflowService'
"""The name of the Service"""
-
- ResponseError = DynamoDBResponseError
+
+ # In some cases, the fault response __type value is mapped to
+ # an exception class more specific than SWFResponseError.
+ _fault_excp = {
+ 'com.amazonaws.swf.base.model#DomainAlreadyExistsFault':
+ swf_exceptions.SWFDomainAlreadyExistsError,
+ 'com.amazonaws.swf.base.model#LimitExceededFault':
+ swf_exceptions.SWFLimitExceededError,
+ 'com.amazonaws.swf.base.model#OperationNotPermittedFault':
+ swf_exceptions.SWFOperationNotPermittedError,
+ 'com.amazonaws.swf.base.model#TypeAlreadyExistsFault':
+ swf_exceptions.SWFTypeAlreadyExistsError ,
+ }
+
+ ResponseError = SWFResponseError
def __init__(self, aws_access_key_id=None, aws_secret_access_key=None,
is_secure=True, port=None, proxy=None, proxy_port=None,
@@ -75,7 +89,7 @@ def _required_auth_capability(self):
def make_request(self, action, body='', object_hook=None):
"""
- :raises: ``DynamoDBExpiredTokenError`` if the security token expires.
+ :raises: ``SWFResponseError`` if response status is not 200.
"""
headers = {'X-Amz-Target': '%s.%s' % (self.ServiceName, action),
'Content-Type': 'application/json; charset=UTF-8',
@@ -87,10 +101,19 @@ def make_request(self, action, body='', object_hook=None):
override_num_retries=10)
response_body = response.read()
boto.log.debug(response_body)
- if response_body:
- return json.loads(response_body, object_hook=object_hook)
+ if response.status == 200:
+ if response_body:
+ return json.loads(response_body, object_hook=object_hook)
+ else:
+ return None
else:
- return None
+ json_body = json.loads(response_body)
+ fault_name = json_body.get('__type', None)
+ # Certain faults get mapped to more specific exception classes.
+ excp_cls = self._fault_excp.get(fault_name, self.ResponseError)
+ raise excp_cls(response.status, response.reason, body=json_body)
+
+
# Actions related to Activities
@@ -655,7 +678,8 @@ def deprecate_activity_type(self, domain, activity_name, activity_version):
## Workflow Management
- def register_workflow_type(self, domain, name, task_list=None,
+ def register_workflow_type(self, domain, name, version,
+ task_list=None,
default_child_policy=None,
default_execution_start_to_close_timeout=None,
default_task_start_to_close_timeout=None,
@@ -671,6 +695,9 @@ def register_workflow_type(self, domain, name, task_list=None,
:type name: string
:param name: The name of the workflow type.
+ :type version: string
+ :param version: The version of the workflow type.
+
:type task_list: list of name, version of tasks
:param name: If set, specifies the default task list to use
for scheduling decision tasks for executions of this workflow
@@ -721,7 +748,7 @@ def register_workflow_type(self, domain, name, task_list=None,
:raises: TypeAlreadyExistsFault, LimitExceededFault,
UnknownResourceFault, OperationNotPermittedFault
"""
- data = {'domain': domain, 'name': name}
+ data = {'domain': domain, 'name': name, 'version': version}
if task_list:
data['defaultTaskList'] = {'name': task_list}
if default_child_policy:
@@ -1119,53 +1146,88 @@ def count_open_workflow_executions(self, domain, latest_date, oldest_date,
json_input = json.dumps(data)
return self.make_request('CountOpenWorkflowExecutions', json_input)
- def list_open_workflow_executions(self, domain, name, oldest_date, tag, workflow_id, latest_date=None, maximum_page_size=None, next_page_token=None, reverse_order=None, version=None):
- """
- No documentation supplied.
+ def list_open_workflow_executions(self, domain,
+ latest_date=None,
+ oldest_date=None,
+ tag=None, workflow_id=None,
+ workflow_name=None,
+ workflow_version=None,
+ maximum_page_size=None,
+ next_page_token=None,
+ reverse_order=None):
+ """
+ Returns the list of open workflow executions within the
+ given domain that meet the specified filtering criteria.
+
+ .. note:
+ workflow_id, workflow_name/workflow_version
+ and tag are mutually exclusive. You can specify at most
+ one of these in a request.
:type domain: string
- :param domain: no docs
+ :param domain: The name of the domain containing the
+ workflow executions to count.
- :type name: string
- :param name: no docs
+ :type latest_date: timestamp
+ :param latest_date: Specifies the latest start or close date
+ and time to return.
:type oldest_date: timestamp
- :param oldest_date: no docs
+ :param oldest_date: Specifies the oldest start or close date
+ and time to return.
:type tag: string
- :param tag: no docs
+ :param tag: If specified, only executions that have a tag
+ that matches the filter are counted.
:type workflow_id: string
- :param workflow_id: no docs
+ :param workflow_id: If specified, only workflow executions
+ matching the workflow_id are counted.
- :type latest_date: timestamp
- :param latest_date: no docs
+ :type workflow_name: string
+ :param workflow_name: Name of the workflow type to filter on.
+
+ :type workflow_version: string
+ :param workflow_version: Version of the workflow type to filter on.
:type maximum_page_size: integer
- :param maximum_page_size: no docs
+ :param maximum_page_size: The maximum number of results
+ returned in each page. The default is 100, but the caller can
+ override this value to a page size smaller than the
+ default. You cannot specify a page size greater than 100.
:type next_page_token: string
- :param next_page_token: no docs
+ :param next_page_token: If on a previous call to this method a
+ NextPageToken was returned, the results are being
+ paginated. To get the next page of results, repeat the call
+ with the returned token and all other arguments unchanged.
:type reverse_order: boolean
- :param reverse_order: no docs
+ :param reverse_order: When set to true, returns the results in
+ reverse order. By default the results are returned in
+ descending order of the start or the close time of the
+ executions.
- :type version: string
- :param version: no docs
+ :raises: UnknownResourceFault, OperationNotPermittedFault
- :raises: #UnknownResourceFault, #OperationNotPermittedFault
"""
- data = {'domain': domain, 'name': name, 'oldestDate': oldest_date, 'tag': tag, 'workflowId': workflow_id}
- if latest_date:
- data['latestDate'] = latest_date
+ data = {'domain': domain}
+ data['startTimeFilter'] = {'oldestDate': oldest_date,
+ 'latestDate': latest_date}
+ if tag:
+ data['tagFilter'] = {'tag': tag}
+ if workflow_name and workflow_version:
+ data['typeFilter'] = {'name': workflow_name,
+ 'version': workflow_version}
+ if workflow_id:
+ data['executionFilter'] = {'workflowId': workflow_id}
+
if maximum_page_size:
data['maximumPageSize'] = maximum_page_size
if next_page_token:
data['nextPageToken'] = next_page_token
if reverse_order:
data['reverseOrder'] = 'true'
- if version:
- data['version'] = version
json_input = json.dumps(data)
return self.make_request('ListOpenWorkflowExecutions', json_input)
@@ -1356,6 +1418,10 @@ def list_closed_workflow_executions(self, domain,
if close_latest_date and close_oldest_date:
data['closeTimeFilter'] = {'oldestDate': close_oldest_date,
'latestDate': close_latest_date}
+
+ if workflow_id:
+ data['executionFilter'] = {'workflowId': workflow_id}
+
if close_status:
data['closeStatusFilter'] = {'status': close_status}
if tag:
Please sign in to comment.
Something went wrong with that request. Please try again.