Permalink
Browse files

Closes OOZIE-112 workflow kill node message is not resolved and set i…

…t to action error message
  • Loading branch information...
1 parent c52c365 commit 1fcf0e4b4ae1218766dfa2d38dfbb9bc1beb9e90 Angelo Kaichen Huang committed Jun 1, 2011
View
1 core/src/main/java/org/apache/oozie/ErrorCode.java
@@ -122,6 +122,7 @@
E0726(XLog.STD, "Workflow action can not be killed, {0}"),
E0727(XLog.STD, "Workflow Job can not be suspended as its not in running state, {0}, Status: {1}"),
E0728(XLog.STD, "Coordinator Job can not be suspended as job finished or failed or killed, id : {0}, status : {1}"),
+ E0729(XLog.OPS, "Kill node message [{0}]"),
E0800(XLog.STD, "Action it is not running its in [{1}] state, action [{0}]"),
E0801(XLog.STD, "Workflow already running, workflow [{0}]"),
View
7 core/src/main/java/org/apache/oozie/command/wf/ActionEndXCommand.java
@@ -198,6 +198,7 @@ protected Void execute() throws CommandException {
wfAction.getName(), ex.getErrorType(), ex.getErrorCode(), ex.getMessage());
wfAction.setErrorInfo(ex.getErrorCode(), ex.getMessage());
wfAction.setEndTime(null);
+
switch (ex.getErrorType()) {
case TRANSIENT:
if (!handleTransient(context, executor, WorkflowAction.Status.END_RETRY)) {
@@ -219,13 +220,19 @@ protected Void execute() throws CommandException {
failJob(context);
break;
}
+
+ WorkflowInstance wfInstance = wfJob.getWorkflowInstance();
+ DagELFunctions.setActionInfo(wfInstance, wfAction);
+ wfJob.setWorkflowInstance(wfInstance);
+
try {
jpaService.execute(new WorkflowActionUpdateJPAExecutor(wfAction));
jpaService.execute(new WorkflowJobUpdateJPAExecutor(wfJob));
}
catch (JPAExecutorException je) {
throw new CommandException(je);
}
+
}
catch (JPAExecutorException je) {
throw new CommandException(je);
View
31 core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java
@@ -25,6 +25,7 @@
import org.apache.oozie.command.CommandException;
import org.apache.oozie.command.PreconditionException;
import org.apache.oozie.command.coord.CoordActionUpdateXCommand;
+import org.apache.oozie.command.wf.ActionXCommand.ActionExecutorContext;
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.executor.jpa.WorkflowActionGetJPAExecutor;
import org.apache.oozie.executor.jpa.WorkflowActionInsertJPAExecutor;
@@ -39,6 +40,8 @@
import org.apache.oozie.service.WorkflowStoreService;
import org.apache.oozie.workflow.WorkflowException;
import org.apache.oozie.workflow.WorkflowInstance;
+import org.apache.oozie.workflow.lite.KillNodeDef;
+import org.apache.oozie.workflow.lite.NodeDef;
import org.apache.oozie.util.ELEvaluator;
import org.apache.oozie.util.InstrumentUtils;
import org.apache.oozie.util.LogUtils;
@@ -220,6 +223,34 @@ protected Void execute() throws CommandException {
if (wfJob.getStatus() == WorkflowJob.Status.SUCCEEDED) {
InstrumentUtils.incrJobCounter(INSTR_SUCCEEDED_JOBS_COUNTER_NAME, 1, getInstrumentation());
}
+
+ // output message for Kill node
+ if (wfAction != null) { // wfAction could be a no-op job
+ NodeDef nodeDef = workflowInstance.getNodeDef(wfAction.getExecutionPath());
+ if (nodeDef instanceof KillNodeDef) {
+ ActionExecutorContext context = new ActionXCommand.ActionExecutorContext(wfJob, wfAction, false);
+ try {
+ String tmpNodeConf = nodeDef.getConf();
+ String actionConf = context.getELEvaluator().evaluate(tmpNodeConf, String.class);
+ LOG.debug("Try to resolve KillNode message for jobid [{0}], actionId [{1}], before resolve [{2}], after resolve [{3}]",
+ jobId, actionId, tmpNodeConf, actionConf);
+ if (wfAction.getErrorCode() != null) {
+ wfAction.setErrorInfo(wfAction.getErrorCode(), actionConf);
+ } else {
+ wfAction.setErrorInfo(ErrorCode.E0729.toString(), actionConf);
+ }
+ jpaService.execute(new WorkflowActionUpdateJPAExecutor(wfAction));
+ }
+ catch (JPAExecutorException je) {
+ throw new CommandException(je);
+ }
+ catch (Exception ex) {
+ LOG.warn("Exception in SignalXCommand ", ex.getMessage(), ex);
+ throw new CommandException(ErrorCode.E0729, wfAction.getName(), ex);
+ }
+ }
+ }
+
}
else {
for (WorkflowActionBean newAction : WorkflowStoreService.getStartedActions(workflowInstance)) {
View
2 core/src/main/java/org/apache/oozie/service/CallableQueueService.java
@@ -435,7 +435,7 @@ protected void debug(String msgTemplate, Object... msgArgs) {
queueSize) {
@Override
protected void debug(String msgTemplate, Object... msgArgs) {
- log.debug(msgTemplate, msgArgs);
+ log.trace(msgTemplate, msgArgs);
}
@Override
View
8 core/src/main/java/org/apache/oozie/workflow/WorkflowInstance.java
@@ -15,6 +15,7 @@
package org.apache.oozie.workflow;
import org.apache.hadoop.conf.Configuration;
+import org.apache.oozie.workflow.lite.NodeDef;
import java.util.Map;
@@ -185,4 +186,11 @@ public boolean isEndState() {
*/
public String getTransition(String node);
+ /**
+ * Get NodeDef from workflow instance
+ * @param executionPath execution path
+ * @return node def
+ */
+ public NodeDef getNodeDef(String executionPath);
+
}
View
19 core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowInstance.java
@@ -305,6 +305,23 @@ else if (context.status == Status.SUSPENDED) {
return status.isEndState();
}
+ /**
+ * Get NodeDef from workflow instance
+ * @param executionPath execution path
+ * @return node def
+ */
+ public NodeDef getNodeDef(String executionPath) {
+ NodeInstance nodeJob = executionPaths.get(executionPath);
+ if (nodeJob == null) {
+ log.error("invalid execution path [{0}]", executionPath);
+ }
+ NodeDef nodeDef = def.getNode(nodeJob.nodeName);
+ if (nodeDef == null) {
+ log.error("invalid transition [{0}]", nodeJob.nodeName);
+ }
+ return nodeDef;
+ }
+
public synchronized void fail(String nodeName) throws WorkflowException {
if (status.isEndState()) {
throw new WorkflowException(ErrorCode.E0718);
@@ -601,10 +618,12 @@ public String getTransition(String node) {
return persistentVars.get(node + WorkflowInstance.NODE_VAR_SEPARATOR + TRANSITION_TO);
}
+ @Override
public boolean equals(Object o) {
return (o != null) && (getClass().isInstance(o)) && ((WorkflowInstance) o).getId().equals(instanceId);
}
+ @Override
public int hashCode() {
return instanceId.hashCode();
}
View
47 core/src/test/java/org/apache/oozie/command/wf/TestActionErrors.java
@@ -30,6 +30,7 @@
import org.apache.oozie.client.CoordinatorJob;
import org.apache.oozie.client.CoordinatorAction;
import org.apache.oozie.client.OozieClient;
+import org.apache.oozie.client.WorkflowAction;
import org.apache.oozie.client.WorkflowJob;
import org.apache.oozie.executor.jpa.CoordActionGetForExternalIdJPAExecutor;
import org.apache.oozie.executor.jpa.CoordActionUpdateJPAExecutor;
@@ -212,6 +213,52 @@ public void testEndDataNotSet() throws Exception {
}
/**
+ * Provides functionality to test kill node message
+ *
+ * @throws Exception
+ */
+ public void testKillNodeErrorMessage() throws Exception {
+ Reader reader = IOUtils.getResourceAsReader("wf-test-kill-node-message.xml", -1);
+ Writer writer = new FileWriter(getTestCaseDir() + "/workflow.xml");
+ IOUtils.copyCharStream(reader, writer);
+
+ final DagEngine engine = new DagEngine("u", "a");
+ Configuration conf = new XConfiguration();
+ conf.set(OozieClient.APP_PATH, getTestCaseDir() + File.separator + "workflow.xml");
+ conf.set(OozieClient.USER_NAME, getTestUser());
+ conf.set(OozieClient.GROUP_NAME, getTestGroup());
+ injectKerberosInfo(conf);
+ conf.set(OozieClient.LOG_TOKEN, "t");
+ conf.set("error", "end.error");
+ conf.set("external-status", "FAILED/KILLED");
+ conf.set("signal-value", "fail");
+
+ final String jobId = engine.submitJob(conf, true);
+
+ final JPAService jpaService = Services.get().get(JPAService.class);
+ final WorkflowJobGetJPAExecutor wfJobGetCmd = new WorkflowJobGetJPAExecutor(jobId);
+
+ waitFor(50000, new Predicate() {
+ public boolean evaluate() throws Exception {
+ WorkflowJobBean job = jpaService.execute(wfJobGetCmd);
+ return (job.getWorkflowInstance().getStatus() == WorkflowInstance.Status.KILLED);
+ }
+ });
+
+ WorkflowJobBean job = jpaService.execute(wfJobGetCmd);
+ assertEquals(WorkflowJob.Status.KILLED, job.getStatus());
+
+ WorkflowActionsGetForJobJPAExecutor wfActionsGetCmd = new WorkflowActionsGetForJobJPAExecutor(jobId);
+ List<WorkflowActionBean> actions = jpaService.execute(wfActionsGetCmd);
+
+ int n = actions.size();
+ WorkflowActionBean action = actions.get(n - 1);
+ assertEquals("TEST_ERROR", action.getErrorCode());
+ assertEquals("[end]", action.getErrorMessage());
+ assertEquals(WorkflowAction.Status.ERROR, action.getStatus());
+ }
+
+ /**
* Provides functionality to test non transient failures.
*
* @param errorType the error type. (start.non-transient, end.non-transient)
View
38 core/src/test/resources/wf-test-kill-node-message.xml
@@ -0,0 +1,38 @@
+<!--
+ Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License. See accompanying LICENSE file.
+-->
+<workflow-app xmlns="uri:oozie:workflow:0.1" name="test-wf">
+
+ <start to="a"/>
+
+ <action name="a">
+ <test xmlns="uri:test">
+ <signal-value>${wf:conf('signal-value')}</signal-value>
+ <external-status>${wf:conf('external-status')}</external-status>
+ <error>${wf:conf('error')}</error>
+ <avoid-set-execution-data>${wf:conf('avoid-set-execution-data')}</avoid-set-execution-data>
+ <avoid-set-end-data>${wf:conf('avoid-set-end-data')}</avoid-set-end-data>
+ <running-mode>${wf:conf('running-mode')}</running-mode>
+ </test>
+ <ok to="end"/>
+ <error to="kill"/>
+ </action>
+
+ <kill name="kill">
+ <message>[${wf:errorMessage(wf:lastErrorNode())}]</message>
+ </kill>
+
+ <end name="end"/>
+
+</workflow-app>
View
2 release-log.txt
@@ -1,5 +1,7 @@
-- Oozie 3.1.0 release
+OOZIE-113 merge changes for OOZIE-101 to ActionEndXCommand
+OOZIE-112 workflow kill node message is not resolved and set it to action error message
OOZIE-111 adjust configuration for DBCP data source
OOZIE-105 bring JPA changes to master from integration branch
OOZIE-103 upgrade openjpa jar to 2.1.0

0 comments on commit 1fcf0e4

Please sign in to comment.