Skip to content

Commit

Permalink
Closes OOZIE-112 workflow kill node message is not resolved and set i…
Browse files Browse the repository at this point in the history
…t to action error message
  • Loading branch information
Angelo Kaichen Huang committed Jun 2, 2011
1 parent c52c365 commit 1fcf0e4
Show file tree
Hide file tree
Showing 9 changed files with 154 additions and 1 deletion.
1 change: 1 addition & 0 deletions core/src/main/java/org/apache/oozie/ErrorCode.java
Expand Up @@ -122,6 +122,7 @@ public enum ErrorCode {
E0726(XLog.STD, "Workflow action can not be killed, {0}"),
E0727(XLog.STD, "Workflow Job can not be suspended as its not in running state, {0}, Status: {1}"),
E0728(XLog.STD, "Coordinator Job can not be suspended as job finished or failed or killed, id : {0}, status : {1}"),
E0729(XLog.OPS, "Kill node message [{0}]"),

E0800(XLog.STD, "Action it is not running its in [{1}] state, action [{0}]"),
E0801(XLog.STD, "Workflow already running, workflow [{0}]"),
Expand Down
Expand Up @@ -198,6 +198,7 @@ protected Void execute() throws CommandException {
wfAction.getName(), ex.getErrorType(), ex.getErrorCode(), ex.getMessage());
wfAction.setErrorInfo(ex.getErrorCode(), ex.getMessage());
wfAction.setEndTime(null);

switch (ex.getErrorType()) {
case TRANSIENT:
if (!handleTransient(context, executor, WorkflowAction.Status.END_RETRY)) {
Expand All @@ -219,13 +220,19 @@ protected Void execute() throws CommandException {
failJob(context);
break;
}

WorkflowInstance wfInstance = wfJob.getWorkflowInstance();
DagELFunctions.setActionInfo(wfInstance, wfAction);
wfJob.setWorkflowInstance(wfInstance);

try {
jpaService.execute(new WorkflowActionUpdateJPAExecutor(wfAction));
jpaService.execute(new WorkflowJobUpdateJPAExecutor(wfJob));
}
catch (JPAExecutorException je) {
throw new CommandException(je);
}

}
catch (JPAExecutorException je) {
throw new CommandException(je);
Expand Down
31 changes: 31 additions & 0 deletions core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java
Expand Up @@ -25,6 +25,7 @@
import org.apache.oozie.command.CommandException;
import org.apache.oozie.command.PreconditionException;
import org.apache.oozie.command.coord.CoordActionUpdateXCommand;
import org.apache.oozie.command.wf.ActionXCommand.ActionExecutorContext;
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.executor.jpa.WorkflowActionGetJPAExecutor;
import org.apache.oozie.executor.jpa.WorkflowActionInsertJPAExecutor;
Expand All @@ -39,6 +40,8 @@
import org.apache.oozie.service.WorkflowStoreService;
import org.apache.oozie.workflow.WorkflowException;
import org.apache.oozie.workflow.WorkflowInstance;
import org.apache.oozie.workflow.lite.KillNodeDef;
import org.apache.oozie.workflow.lite.NodeDef;
import org.apache.oozie.util.ELEvaluator;
import org.apache.oozie.util.InstrumentUtils;
import org.apache.oozie.util.LogUtils;
Expand Down Expand Up @@ -220,6 +223,34 @@ protected Void execute() throws CommandException {
if (wfJob.getStatus() == WorkflowJob.Status.SUCCEEDED) {
InstrumentUtils.incrJobCounter(INSTR_SUCCEEDED_JOBS_COUNTER_NAME, 1, getInstrumentation());
}

// output message for Kill node
if (wfAction != null) { // wfAction could be a no-op job
NodeDef nodeDef = workflowInstance.getNodeDef(wfAction.getExecutionPath());
if (nodeDef instanceof KillNodeDef) {
ActionExecutorContext context = new ActionXCommand.ActionExecutorContext(wfJob, wfAction, false);
try {
String tmpNodeConf = nodeDef.getConf();
String actionConf = context.getELEvaluator().evaluate(tmpNodeConf, String.class);
LOG.debug("Try to resolve KillNode message for jobid [{0}], actionId [{1}], before resolve [{2}], after resolve [{3}]",
jobId, actionId, tmpNodeConf, actionConf);
if (wfAction.getErrorCode() != null) {
wfAction.setErrorInfo(wfAction.getErrorCode(), actionConf);
} else {
wfAction.setErrorInfo(ErrorCode.E0729.toString(), actionConf);
}
jpaService.execute(new WorkflowActionUpdateJPAExecutor(wfAction));
}
catch (JPAExecutorException je) {
throw new CommandException(je);
}
catch (Exception ex) {
LOG.warn("Exception in SignalXCommand ", ex.getMessage(), ex);
throw new CommandException(ErrorCode.E0729, wfAction.getName(), ex);
}
}
}

}
else {
for (WorkflowActionBean newAction : WorkflowStoreService.getStartedActions(workflowInstance)) {
Expand Down
Expand Up @@ -435,7 +435,7 @@ protected void debug(String msgTemplate, Object... msgArgs) {
queueSize) {
@Override
protected void debug(String msgTemplate, Object... msgArgs) {
log.debug(msgTemplate, msgArgs);
log.trace(msgTemplate, msgArgs);
}

@Override
Expand Down
Expand Up @@ -15,6 +15,7 @@
package org.apache.oozie.workflow;

import org.apache.hadoop.conf.Configuration;
import org.apache.oozie.workflow.lite.NodeDef;

import java.util.Map;

Expand Down Expand Up @@ -185,4 +186,11 @@ public boolean isEndState() {
*/
public String getTransition(String node);

/**
* Get NodeDef from workflow instance
* @param executionPath execution path
* @return node def
*/
public NodeDef getNodeDef(String executionPath);

}
Expand Up @@ -305,6 +305,23 @@ else if (context.status == Status.SUSPENDED) {
return status.isEndState();
}

/**
* Get NodeDef from workflow instance
* @param executionPath execution path
* @return node def
*/
public NodeDef getNodeDef(String executionPath) {
NodeInstance nodeJob = executionPaths.get(executionPath);
if (nodeJob == null) {
log.error("invalid execution path [{0}]", executionPath);
}
NodeDef nodeDef = def.getNode(nodeJob.nodeName);
if (nodeDef == null) {
log.error("invalid transition [{0}]", nodeJob.nodeName);
}
return nodeDef;
}

public synchronized void fail(String nodeName) throws WorkflowException {
if (status.isEndState()) {
throw new WorkflowException(ErrorCode.E0718);
Expand Down Expand Up @@ -601,10 +618,12 @@ public String getTransition(String node) {
return persistentVars.get(node + WorkflowInstance.NODE_VAR_SEPARATOR + TRANSITION_TO);
}

@Override
public boolean equals(Object o) {
return (o != null) && (getClass().isInstance(o)) && ((WorkflowInstance) o).getId().equals(instanceId);
}

@Override
public int hashCode() {
return instanceId.hashCode();
}
Expand Down
Expand Up @@ -30,6 +30,7 @@
import org.apache.oozie.client.CoordinatorJob;
import org.apache.oozie.client.CoordinatorAction;
import org.apache.oozie.client.OozieClient;
import org.apache.oozie.client.WorkflowAction;
import org.apache.oozie.client.WorkflowJob;
import org.apache.oozie.executor.jpa.CoordActionGetForExternalIdJPAExecutor;
import org.apache.oozie.executor.jpa.CoordActionUpdateJPAExecutor;
Expand Down Expand Up @@ -211,6 +212,52 @@ public void testEndDataNotSet() throws Exception {
_testDataNotSet("avoid-set-end-data", ActionEndCommand.END_DATA_MISSING);
}

/**
* Provides functionality to test kill node message
*
* @throws Exception
*/
public void testKillNodeErrorMessage() throws Exception {
Reader reader = IOUtils.getResourceAsReader("wf-test-kill-node-message.xml", -1);
Writer writer = new FileWriter(getTestCaseDir() + "/workflow.xml");
IOUtils.copyCharStream(reader, writer);

final DagEngine engine = new DagEngine("u", "a");
Configuration conf = new XConfiguration();
conf.set(OozieClient.APP_PATH, getTestCaseDir() + File.separator + "workflow.xml");
conf.set(OozieClient.USER_NAME, getTestUser());
conf.set(OozieClient.GROUP_NAME, getTestGroup());
injectKerberosInfo(conf);
conf.set(OozieClient.LOG_TOKEN, "t");
conf.set("error", "end.error");
conf.set("external-status", "FAILED/KILLED");
conf.set("signal-value", "fail");

final String jobId = engine.submitJob(conf, true);

final JPAService jpaService = Services.get().get(JPAService.class);
final WorkflowJobGetJPAExecutor wfJobGetCmd = new WorkflowJobGetJPAExecutor(jobId);

waitFor(50000, new Predicate() {
public boolean evaluate() throws Exception {
WorkflowJobBean job = jpaService.execute(wfJobGetCmd);
return (job.getWorkflowInstance().getStatus() == WorkflowInstance.Status.KILLED);
}
});

WorkflowJobBean job = jpaService.execute(wfJobGetCmd);
assertEquals(WorkflowJob.Status.KILLED, job.getStatus());

WorkflowActionsGetForJobJPAExecutor wfActionsGetCmd = new WorkflowActionsGetForJobJPAExecutor(jobId);
List<WorkflowActionBean> actions = jpaService.execute(wfActionsGetCmd);

int n = actions.size();
WorkflowActionBean action = actions.get(n - 1);
assertEquals("TEST_ERROR", action.getErrorCode());
assertEquals("[end]", action.getErrorMessage());
assertEquals(WorkflowAction.Status.ERROR, action.getStatus());
}

/**
* Provides functionality to test non transient failures.
*
Expand Down
38 changes: 38 additions & 0 deletions core/src/test/resources/wf-test-kill-node-message.xml
@@ -0,0 +1,38 @@
<!--
Copyright (c) 2010 Yahoo! Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. See accompanying LICENSE file.
-->
<workflow-app xmlns="uri:oozie:workflow:0.1" name="test-wf">

<start to="a"/>

<action name="a">
<test xmlns="uri:test">
<signal-value>${wf:conf('signal-value')}</signal-value>
<external-status>${wf:conf('external-status')}</external-status>
<error>${wf:conf('error')}</error>
<avoid-set-execution-data>${wf:conf('avoid-set-execution-data')}</avoid-set-execution-data>
<avoid-set-end-data>${wf:conf('avoid-set-end-data')}</avoid-set-end-data>
<running-mode>${wf:conf('running-mode')}</running-mode>
</test>
<ok to="end"/>
<error to="kill"/>
</action>

<kill name="kill">
<message>[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>

<end name="end"/>

</workflow-app>
2 changes: 2 additions & 0 deletions release-log.txt
@@ -1,5 +1,7 @@
-- Oozie 3.1.0 release

OOZIE-113 merge changes for OOZIE-101 to ActionEndXCommand
OOZIE-112 workflow kill node message is not resolved and set it to action error message
OOZIE-111 adjust configuration for DBCP data source
OOZIE-105 bring JPA changes to master from integration branch
OOZIE-103 upgrade openjpa jar to 2.1.0
Expand Down

0 comments on commit 1fcf0e4

Please sign in to comment.