Skip to content

Commit

Permalink
Merge "Fixing engine transaction model and error handling"
Browse files Browse the repository at this point in the history
  • Loading branch information
Jenkins authored and openstack-gerrit committed Mar 30, 2016
2 parents df1cab7 + ad07ba0 commit ea935a0
Show file tree
Hide file tree
Showing 11 changed files with 471 additions and 328 deletions.
28 changes: 28 additions & 0 deletions AUTHORS
@@ -1,43 +1,71 @@
Abhishek Chanda <abhishek@cloudscaling.com>
Alexander Kuznetsov <akuznetsov@mirantis.com>
Anastasia Kuznetsova <akuznetsova@mirantis.com>
Andreas Jaeger <aj@suse.com>
Angus Salkeld <angus.salkeld@rackspace.com>
Ankita Wagh <ankita_wagh@symmactoolkit-c02lr80ufd57.symc.symantec.com>
Antoine Musso <hashar@free.fr>
Bertrand Lallau <bertrand.lallau@gmail.com>
Bhaskar Duvvuri <dbasu84@gmail.com>
Boris Pavlovic <boris@pavlovic.me>
Bryan Havenstein <bryan.havenstein@ericsson.com>
Chaozhe.Chen <chaozhe.chen@easystack.cn>
Christian Berendt <berendt@b1-systems.de>
Claudiu Belu <cbelu@cloudbasesolutions.com>
Dan Prince <dprince@redhat.com>
Daryl Mowrer <dmowrer@us.ibm.com>
David C Kennedy <david.c.kennedy@hp.com>
David Charles Kennedy <dkennedy@hp.com>
Dawid Deja <dawid.deja@intel.com>
Dmitri Zimine <dz@stackstorm.com>
Doug Hellmann <doug@doughellmann.com>
Ed Cranford <ed.cranford@rackspace.com>
Gal Margalit <gal.margalit@alcatel-lucent.com>
Guy Paz <guy.paz@alcatel-lucent.com>
Jeremy Stanley <fungi@yuggoth.org>
Jiri Tomasek <jtomasek@redhat.com>
Kevin Pouget <kpouget@altair.com>
Kirill Izotov <enykeev@stackstorm.com>
Lakshmi Kannan <lakshmi@stackstorm.com>
Limor <limor.bortman@nokia.com>
Limor Stotland <limor.bortman@alcatel-lucent.com>
Lingxian Kong <konglingxian@huawei.com>
Liu Sheng <liusheng@huawei.com>
LiuNanke <nanke.liu@easystack.cn>
Manas Kelshikar <manas@stackstorm.com>
Michael Krotscheck <krotscheck@gmail.com>
Michal Gershenzon <michal.gershenzon@alcatel-lucent.com>
Monty Taylor <mordred@inaugust.com>
Moshe Elisha <moshe.elisha@alcatel-lucent.com>
Nikolay Mahotkin <nmakhotkin@mirantis.com>
Noa Koffman <noa.koffman@alcatel-lucent.com>
Oleksii Chuprykov <ochuprykov@mirantis.com>
Pierre-Arthur MATHIEU <pierre-arthur.mathieu@hp.com>
Ray Chen <chenrano2002@gmail.com>
Renat Akhmerov <rakhmerov@mirantis.com>
Renat Akhmerov <renat.akhmerov@gmail.com>
Rico Lin <rico.lin.guanyu@gmail.com>
Rinat Sabitov <rinat.sabitov@gmail.com>
Sergey Kolekonov <skolekonov@mirantis.com>
Sergey Murashov <smurashov@mirantis.com>
Shuquan Huang <huang.shuquan@99cloud.net>
Thierry Carrez <thierry@openstack.org>
Thomas Herve <therve@redhat.com>
Timur Nurlygayanov <tnurlygayanov@mirantis.com>
Venkata Mahesh Kotha <venkatamaheshkotha@gmail.com>
Winson Chan <wcchan@stackstorm.com>
Yaroslav Lobankov <ylobankov@mirantis.com>
Zhao Lei <zhaolei@cn.fujitsu.com>
Zhenguo Niu <niuzhenguo@huawei.com>
ZhiQiang Fan <aji.zqfan@gmail.com>
ZhiQiang Fan <zhiqiang.fan@huawei.com>
Zhu Rong <zhu.rong@99cloud.net>
caoyue <yue.cao@easystack.cn>
cheneydc <dongc@neunn.com>
hardik <hardik.parekh@nectechnologies.in>
hparekh <hardik.parekh@nectechnologies.in>
keliang <ke.liang@easystack.cn>
syed ahsan shamim zaidi <ahsanmohsin04@yahoo.com>
tengqm <tengqim@cn.ibm.com>
wangzhh <wangzhh@awcloud.com>
zhangguoqing <zhang.guoqing@99cloud.net>
2 changes: 1 addition & 1 deletion mistral/engine/action_handler.py
Expand Up @@ -29,7 +29,7 @@

def create_action_execution(action_def, action_input, task_ex=None,
index=0, description=''):
# TODO(rakhmerov): We can avoid hitting DB at all when calling something
# TODO(rakhmerov): We can avoid hitting DB at all when calling things like
# create_action_execution(), these operations can be just done using
# SQLAlchemy session (1-level cache) and session flush (on TX commit) would
# send necessary SQL queries to DB. Currently, session flush happens
Expand Down
113 changes: 60 additions & 53 deletions mistral/engine/default_engine.py
Expand Up @@ -24,6 +24,7 @@
from mistral.engine import base
from mistral.engine import task_handler
from mistral.engine import workflow_handler as wf_handler
from mistral import exceptions as exc
from mistral.services import action_manager as a_m
from mistral.services import executions as wf_ex_service
from mistral.services import workflows as wf_service
Expand Down Expand Up @@ -55,6 +56,9 @@ def start_workflow(self, wf_identifier, wf_input, description='',
wf_ex_id = None

try:
# Create a persistent workflow execution in a separate transaction
# so that we can return it even in case of unexpected errors that
# lead to transaction rollback.
with db_api.transaction():
# The new workflow execution will be in an IDLE
# state on initial record creation.
Expand All @@ -65,10 +69,6 @@ def start_workflow(self, wf_identifier, wf_input, description='',
params
)

# Separate workflow execution creation and dispatching command
# transactions in order to be able to return workflow execution
# with corresponding error message in state_info when error occurs
# at dispatching commands.
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex_id)
wf_handler.set_execution_state(wf_ex, states.RUNNING)
Expand Down Expand Up @@ -161,14 +161,10 @@ def on_task_state_change(self, task_ex_id, state, state_info=None):

self._on_task_state_change(task_ex, wf_ex, wf_spec)

def _on_task_state_change(self, task_ex, wf_ex, wf_spec,
task_state=states.SUCCESS):
def _on_task_state_change(self, task_ex, wf_ex, wf_spec):
task_spec = wf_spec.get_tasks()[task_ex.name]

# We must be sure that if task is completed,
# it was also completed in previous transaction.
if (task_handler.is_task_completed(task_ex, task_spec)
and states.is_completed(task_state)):
if task_handler.is_task_completed(task_ex, task_spec):
task_handler.after_task_complete(task_ex, task_spec, wf_spec)

# Ignore DELAYED state.
Expand All @@ -178,8 +174,21 @@ def _on_task_state_change(self, task_ex, wf_ex, wf_spec,
wf_ctrl = wf_base.get_controller(wf_ex, wf_spec)

# Calculate commands to process next.
cmds = wf_ctrl.continue_workflow()
try:
cmds = wf_ctrl.continue_workflow()
except exc.YaqlEvaluationException as e:
LOG.error(
'YAQL error occurred while calculating next workflow '
'commands [wf_ex_id=%s, task_ex_id=%s]: %s',
wf_ex.id, task_ex.id, e
)

wf_handler.fail_workflow(wf_ex, str(e))

return

# Mark task as processed after all decisions have been made
# upon its completion.
task_ex.processed = True

self._dispatch_workflow_commands(wf_ex, cmds, wf_spec)
Expand Down Expand Up @@ -235,6 +244,7 @@ def on_action_complete(self, action_ex_id, result):

wf_ex_id = action_ex.task_execution.workflow_execution_id
wf_ex = wf_handler.lock_workflow_execution(wf_ex_id)

wf_spec = spec_parser.get_workflow_spec(wf_ex.spec)

task_ex = task_handler.on_action_complete(
Expand All @@ -248,30 +258,13 @@ def on_action_complete(self, action_ex_id, result):
if states.is_paused_or_completed(wf_ex.state):
return action_ex.get_clone()

prev_task_state = task_ex.state

# Separate the task transition in a separate transaction. The task
# has already completed for better or worst. The task state should
# not be affected by errors during transition on conditions such as
# on-success and on-error.
with db_api.transaction():
wf_ex = wf_handler.lock_workflow_execution(wf_ex_id)
action_ex = db_api.get_action_execution(action_ex_id)
task_ex = action_ex.task_execution

self._on_task_state_change(
task_ex,
wf_ex,
wf_spec,
task_state=prev_task_state
)
self._on_task_state_change(task_ex, wf_ex, wf_spec)

return action_ex.get_clone()
except Exception as e:
# TODO(dzimine): try to find out which command caused failure.
# TODO(rakhmerov): Need to refactor logging in a more elegant way.
LOG.error(
"Failed to handle action execution result [id=%s]: %s\n%s",
'Failed to handle action execution result [id=%s]: %s\n%s',
action_ex_id, e, traceback.format_exc()
)

Expand Down Expand Up @@ -301,12 +294,13 @@ def _continue_workflow(self, wf_ex, task_ex=None, reset=True, env=None):

wf_ctrl = wf_base.get_controller(wf_ex)

# TODO(rakhmerov): Add YAQL error handling.
# Calculate commands to process next.
cmds = wf_ctrl.continue_workflow(task_ex=task_ex, reset=reset, env=env)

# When resuming a workflow we need to ignore all 'pause'
# commands because workflow controller takes tasks that
# completed within the period when the workflow was pause.
# completed within the period when the workflow was paused.
cmds = list(
filter(
lambda c: not isinstance(c, commands.PauseWorkflow),
Expand All @@ -323,6 +317,7 @@ def _continue_workflow(self, wf_ex, task_ex=None, reset=True, env=None):
t_ex.processed = True

wf_spec = spec_parser.get_workflow_spec(wf_ex.spec)

self._dispatch_workflow_commands(wf_ex, cmds, wf_spec)

if not cmds:
Expand Down Expand Up @@ -378,9 +373,9 @@ def resume_workflow(self, wf_ex_id, env=None):
raise e

@u.log_exec(LOG)
def stop_workflow(self, execution_id, state, message=None):
def stop_workflow(self, wf_ex_id, state, message=None):
with db_api.transaction():
wf_ex = wf_handler.lock_workflow_execution(execution_id)
wf_ex = wf_handler.lock_workflow_execution(wf_ex_id)

return self._stop_workflow(wf_ex, state, message)

Expand All @@ -390,13 +385,16 @@ def _stop_workflow(wf_ex, state, message=None):
wf_ctrl = wf_base.get_controller(wf_ex)

final_context = {}

try:
final_context = wf_ctrl.evaluate_workflow_final_context()
except Exception as e:
LOG.warning(
"Failed to get final context for %s: %s" % (wf_ex, e)
'Failed to get final context for %s: %s' % (wf_ex, e)
)

wf_spec = spec_parser.get_workflow_spec(wf_ex.spec)

return wf_handler.succeed_workflow(
wf_ex,
final_context,
Expand All @@ -409,7 +407,7 @@ def _stop_workflow(wf_ex, state, message=None):
return wf_ex

@u.log_exec(LOG)
def rollback_workflow(self, execution_id):
def rollback_workflow(self, wf_ex_id):
# TODO(rakhmerov): Implement.
raise NotImplementedError

Expand All @@ -421,12 +419,26 @@ def _dispatch_workflow_commands(self, wf_ex, wf_cmds, wf_spec):
if isinstance(cmd, commands.RunTask) and cmd.is_waiting():
task_handler.defer_task(cmd)
elif isinstance(cmd, commands.RunTask):
task_handler.run_new_task(cmd, wf_spec)
task_ex = task_handler.run_new_task(cmd, wf_spec)

if task_ex.state == states.ERROR:
wf_handler.fail_workflow(
wf_ex,
'Failed to start task [task_ex=%s]: %s' %
(task_ex, task_ex.state_info)
)
elif isinstance(cmd, commands.RunExistingTask):
task_handler.run_existing_task(
task_ex = task_handler.run_existing_task(
cmd.task_ex.id,
reset=cmd.reset
)

if task_ex.state == states.ERROR:
wf_handler.fail_workflow(
wf_ex,
'Failed to start task [task_ex=%s]: %s' %
(task_ex, task_ex.state_info)
)
elif isinstance(cmd, commands.SetWorkflowState):
if states.is_completed(cmd.new_state):
self._stop_workflow(cmd.wf_ex, cmd.new_state, cmd.msg)
Expand All @@ -441,33 +453,28 @@ def _dispatch_workflow_commands(self, wf_ex, wf_cmds, wf_spec):
if wf_ex.state != states.RUNNING:
break

# TODO(rakhmerov): This method may not be needed at all because error
# handling is now implemented too roughly w/o distinguishing different
# errors. On most errors (like YAQLException) we shouldn't rollback
# transactions, we just need to fail corresponding execution objects
# where a problem happened (action, task or workflow).
@staticmethod
def _fail_workflow(wf_ex_id, err, action_ex_id=None):
def _fail_workflow(wf_ex_id, exc):
"""Private helper to fail workflow on exceptions."""
err_msg = str(err)

with db_api.transaction():
wf_ex = db_api.load_workflow_execution(wf_ex_id)

if wf_ex is None:
LOG.error(
"Cant fail workflow execution with id='%s': not found.",
"Can't fail workflow execution with id='%s': not found.",
wf_ex_id
)
return
return None

wf_handler.set_execution_state(wf_ex, states.ERROR, err_msg)

if action_ex_id:
# Note(dzimine): Don't call self.engine_client:
# 1) to avoid computing and triggering next tasks
# 2) to avoid a loop in case of error in transport
action_ex = db_api.get_action_execution(action_ex_id)
wf_ex = wf_handler.lock_workflow_execution(wf_ex_id)

task_handler.on_action_complete(
action_ex,
spec_parser.get_workflow_spec(wf_ex.spec),
wf_utils.Result(error=err_msg)
)
if not states.is_paused_or_completed(wf_ex.state):
wf_handler.set_execution_state(wf_ex, states.ERROR, str(exc))

return wf_ex

0 comments on commit ea935a0

Please sign in to comment.