Skip to content

Commit

Permalink
Deduplicate activity code.
Browse files Browse the repository at this point in the history
  • Loading branch information
severb committed Nov 19, 2014
1 parent 9f7e157 commit abfa668
Showing 1 changed file with 32 additions and 54 deletions.
86 changes: 32 additions & 54 deletions flowy/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,78 +72,56 @@ def _finish(self, result):
_deserialize_arguments = deserialize_args


class SWFActivity(Task):
def __init__(self, swf_client, input, token):
self._swf_client = swf_client
super(SWFActivity, self).__init__(input, token)

def _suspend(self):
return True

def fail(self, reason):
return _activity_fail(self._swf_client, self.token, reason)

def _finish(self, result):
try:
result = self._serialize_result(result)
except TypeError:
logger.exception('Error while serializing the result:')
return False
return _activity_finish(self._swf_client, self.token, result)

def heartbeat(self):
return _activity_heartbeat(self._swf_client, self.token)


class AsyncSWFActivity(object):
def __init__(self, swf_client, token):
self._swf_client = swf_client
self._token = token
self.token = token

def heartbeat(self):
return _activity_heartbeat(self._swf_client, self._token)
try:
t = str(self.token)
self._swf_client.record_activity_task_heartbeat(task_token=t)
except SWFResponseError:
logger.exception('Error while sending the heartbeat:')
return False
return True

def fail(self, reason):
return _activity_fail(self._swf_client, self._token, reason)
try:
self._swf_client.respond_activity_task_failed(
reason=str(reason)[:256], task_token=str(self.token))
except SWFResponseError:
logger.exception('Error while failing the activity:')
return False
return True

def finish(self, result):
try:
result = self._serialize_result(result)
except TypeError:
logger.exception('Error while serializing the result:')
return False
return _activity_finish(self._swf_client, self._token, result)
try:
self._swf_client.respond_activity_task_completed(
result=str(result), task_token=str(self._token))
except SWFResponseError:
logger.exception('Error while finishing the activity:')
return False
return True

_serialize_result = serialize_result


def _activity_heartbeat(swf_client, token):
try:
swf_client.record_activity_task_heartbeat(task_token=str(token))
except SWFResponseError:
logger.exception('Error while sending the heartbeat:')
return False
return True


def _activity_fail(swf_client, token, reason):
try:
swf_client.respond_activity_task_failed(
reason=str(reason)[:256], task_token=str(token))
except SWFResponseError:
logger.exception('Error while failing the activity:')
return False
return True


def _activity_finish(swf_client, token, result):
try:
swf_client.respond_activity_task_completed(
result=str(result), task_token=str(token))
except SWFResponseError:
logger.exception('Error while finishing the activity:')
return False
return True
class SWFActivity(AsyncSWFActivity, Task):
def __init__(self, swf_client, input, token):
self._swf_client = swf_client
Task.__init__(self, input, token)

def _suspend(self):
return True

def _finish(self, result):
return AsyncSWFActivity.finish(self, result)


class _SWFWorkflow(Task):
Expand Down

0 comments on commit abfa668

Please sign in to comment.