Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Closes OOZIE-10 add user-retry in workflow action

  • Loading branch information...
commit e7b4d9d0a2415e32148b3d8d9522709d57b13b14 1 parent 65baec9
Angelo Kaichen Huang authored Mohammad Kamrul Islam committed
Showing with 1,722 additions and 90 deletions.
  1. +22 −0 client/src/main/java/org/apache/oozie/client/WorkflowAction.java
  2. +285 −0 client/src/main/resources/oozie-workflow-0.3.xsd
  3. +3 −2 core/src/main/java/org/apache/oozie/ErrorCode.java
  4. +15 −0 core/src/main/java/org/apache/oozie/WorkflowActionBean.java
  5. +5 −1 core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
  6. +40 −0 core/src/main/java/org/apache/oozie/client/rest/JsonWorkflowAction.java
  7. +14 −7 core/src/main/java/org/apache/oozie/command/wf/ActionCheckXCommand.java
  8. +17 −9 core/src/main/java/org/apache/oozie/command/wf/ActionEndXCommand.java
  9. +2 −1  core/src/main/java/org/apache/oozie/command/wf/ActionKillXCommand.java
  10. +22 −3 core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java
  11. +63 −24 core/src/main/java/org/apache/oozie/command/wf/ActionXCommand.java
  12. +7 −3 core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java
  13. +3 −0  core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionGetJPAExecutor.java
  14. +3 −0  core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionsGetForJobJPAExecutor.java
  15. +3 −0  core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionsRunningGetJPAExecutor.java
  16. +3 −0  core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobGetActionsJPAExecutor.java
  17. +99 −3 core/src/main/java/org/apache/oozie/service/LiteWorkflowStoreService.java
  18. +3 −0  core/src/main/java/org/apache/oozie/service/RecoveryService.java
  19. +5 −2 core/src/main/java/org/apache/oozie/service/SchemaService.java
  20. +3 −0  core/src/main/java/org/apache/oozie/store/WorkflowStore.java
  21. +12 −3 core/src/main/java/org/apache/oozie/workflow/lite/ActionNodeDef.java
  22. +9 −1 core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowAppParser.java
  23. +150 −24 core/src/main/java/org/apache/oozie/workflow/lite/NodeDef.java
  24. +53 −1 core/src/main/resources/oozie-default.xml
  25. +1 −1  core/src/test/java/org/apache/oozie/command/wf/TestActionCheckXCommand.java
  26. +68 −1 core/src/test/java/org/apache/oozie/command/wf/TestActionErrors.java
  27. +1 −1  core/src/test/java/org/apache/oozie/command/wf/TestActionStartXCommand.java
  28. +75 −1 core/src/test/java/org/apache/oozie/command/wf/TestWorkflowKillXCommand.java
  29. +125 −0 core/src/test/java/org/apache/oozie/service/TestRecoveryService.java
  30. +38 −0 core/src/test/resources/wf-ext-schema-valid-user-retry.xml
  31. +572 −2 docs/src/site/twiki/WorkflowFunctionalSpec.twiki
  32. +1 −0  release-log.txt
View
22 client/src/main/java/org/apache/oozie/client/WorkflowAction.java
@@ -29,6 +29,7 @@
RUNNING,
OK,
ERROR,
+ USER_RETRY,
START_RETRY,
START_MANUAL,
DONE,
@@ -86,6 +87,27 @@
* @return the number of retries of the action.
*/
int getRetries();
+
+ /**
+ * Return the number of user retry of the action.
+ *
+ * @return the number of user retry of the action.
+ */
+ int getUserRetryCount();
+
+ /**
+ * Return the max number of user retry of the action.
+ *
+ * @return the max number of user retry of the action.
+ */
+ int getUserRetryMax();
+
+ /**
+ * Return the interval of user retry of the action, in minutes.
+ *
+ * @return the interval of user retry of the action, in minutes.
+ */
+ int getUserRetryInterval();
/**
* Return the start time of the action action.
View
285 client/src/main/resources/oozie-workflow-0.3.xsd
@@ -0,0 +1,285 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License. See accompanying LICENSE file.
+-->
+<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema" xmlns:workflow="uri:oozie:workflow:0.3"
+ elementFormDefault="qualified" targetNamespace="uri:oozie:workflow:0.3">
+
+ <xs:element name="workflow-app" type="workflow:WORKFLOW-APP"/>
+
+ <xs:simpleType name="IDENTIFIER">
+ <xs:restriction base="xs:string">
+ <xs:pattern value="([a-zA-Z_]([\-_a-zA-Z0-9])*){1,39})"/>
+ </xs:restriction>
+ </xs:simpleType>
+
+ <xs:complexType name="WORKFLOW-APP">
+ <xs:sequence>
+ <xs:element name="credentials" type="workflow:CREDENTIALS" minOccurs="0" maxOccurs="1"/>
+ <xs:element name="start" type="workflow:START" minOccurs="1" maxOccurs="1"/>
+ <xs:choice minOccurs="0" maxOccurs="unbounded">
+ <xs:element name="decision" type="workflow:DECISION" minOccurs="1" maxOccurs="1"/>
+ <xs:element name="fork" type="workflow:FORK" minOccurs="1" maxOccurs="1"/>
+ <xs:element name="join" type="workflow:JOIN" minOccurs="1" maxOccurs="1"/>
+ <xs:element name="kill" type="workflow:KILL" minOccurs="1" maxOccurs="1"/>
+ <xs:element name="action" type="workflow:ACTION" minOccurs="1" maxOccurs="1"/>
+ </xs:choice>
+ <xs:element name="end" type="workflow:END" minOccurs="1" maxOccurs="1"/>
+ <xs:any namespace="uri:oozie:sla:0.1" minOccurs="0" maxOccurs="1"/>
+ </xs:sequence>
+ <xs:attribute name="name" type="workflow:IDENTIFIER" use="required"/>
+ </xs:complexType>
+
+ <xs:complexType name="START">
+ <xs:attribute name="to" type="workflow:IDENTIFIER" use="required"/>
+ </xs:complexType>
+
+ <xs:complexType name="END">
+ <xs:attribute name="name" type="workflow:IDENTIFIER" use="required"/>
+ </xs:complexType>
+
+ <xs:complexType name="DECISION">
+ <xs:sequence>
+ <xs:element name="switch" type="workflow:SWITCH" minOccurs="1" maxOccurs="1"/>
+ </xs:sequence>
+ <xs:attribute name="name" type="workflow:IDENTIFIER" use="required"/>
+ </xs:complexType>
+
+ <xs:element name="switch" type="workflow:SWITCH"/>
+
+ <xs:complexType name="SWITCH">
+ <xs:sequence>
+ <xs:sequence>
+ <xs:element name="case" type="workflow:CASE" minOccurs="1" maxOccurs="unbounded"/>
+ <xs:element name="default" type="workflow:DEFAULT" minOccurs="1" maxOccurs="1"/>
+ </xs:sequence>
+ </xs:sequence>
+ </xs:complexType>
+
+ <xs:complexType name="CASE">
+ <xs:simpleContent>
+ <xs:extension base="xs:string">
+ <xs:attribute name="to" type="workflow:IDENTIFIER" use="required"/>
+ </xs:extension>
+ </xs:simpleContent>
+ </xs:complexType>
+
+ <xs:complexType name="DEFAULT">
+ <xs:attribute name="to" type="workflow:IDENTIFIER" use="required"/>
+ </xs:complexType>
+
+ <xs:complexType name="FORK_TRANSITION">
+ <xs:attribute name="start" type="workflow:IDENTIFIER" use="required"/>
+ </xs:complexType>
+
+ <xs:complexType name="FORK">
+ <xs:sequence>
+ <xs:element name="path" type="workflow:FORK_TRANSITION" minOccurs="2" maxOccurs="unbounded"/>
+ </xs:sequence>
+ <xs:attribute name="name" type="workflow:IDENTIFIER" use="required"/>
+ </xs:complexType>
+
+ <xs:complexType name="JOIN">
+ <xs:attribute name="name" type="workflow:IDENTIFIER" use="required"/>
+ <xs:attribute name="to" type="workflow:IDENTIFIER" use="required"/>
+ </xs:complexType>
+
+ <xs:element name="kill" type="workflow:KILL"/>
+
+ <xs:complexType name="KILL">
+ <xs:sequence>
+ <xs:element name="message" type="xs:string" minOccurs="1" maxOccurs="1"/>
+ </xs:sequence>
+ <xs:attribute name="name" type="workflow:IDENTIFIER" use="required"/>
+ </xs:complexType>
+
+ <xs:complexType name="ACTION_TRANSITION">
+ <xs:attribute name="to" type="workflow:IDENTIFIER" use="required"/>
+ </xs:complexType>
+
+ <xs:element name="map-reduce" type="workflow:MAP-REDUCE"/>
+ <xs:element name="pig" type="workflow:PIG"/>
+ <xs:element name="sub-workflow" type="workflow:SUB-WORKFLOW"/>
+ <xs:element name="fs" type="workflow:FS"/>
+ <xs:element name="java" type="workflow:JAVA"/>
+
+ <xs:complexType name="ACTION">
+ <xs:sequence>
+ <xs:choice minOccurs="1" maxOccurs="1">
+ <xs:element name="map-reduce" type="workflow:MAP-REDUCE" minOccurs="1" maxOccurs="1"/>
+ <xs:element name="pig" type="workflow:PIG" minOccurs="1" maxOccurs="1"/>
+ <xs:element name="sub-workflow" type="workflow:SUB-WORKFLOW" minOccurs="1" maxOccurs="1"/>
+ <xs:element name="fs" type="workflow:FS" minOccurs="1" maxOccurs="1"/>
+ <xs:element name="java" type="workflow:JAVA" minOccurs="1" maxOccurs="1"/>
+ <xs:any namespace="##other" minOccurs="1" maxOccurs="1"/>
+ </xs:choice>
+ <xs:element name="ok" type="workflow:ACTION_TRANSITION" minOccurs="1" maxOccurs="1"/>
+ <xs:element name="error" type="workflow:ACTION_TRANSITION" minOccurs="1" maxOccurs="1"/>
+ <xs:any namespace="uri:oozie:sla:0.1" minOccurs="0" maxOccurs="1"/>
+ </xs:sequence>
+ <xs:attribute name="name" type="workflow:IDENTIFIER" use="required"/>
+ <xs:attribute name="cred" type="xs:string"/>
+ <xs:attribute name="retry-max" type="xs:string"/>
+ <xs:attribute name="retry-interval" type="xs:string"/>
+ </xs:complexType>
+
+ <xs:complexType name="MAP-REDUCE">
+ <xs:sequence>
+ <xs:element name="job-tracker" type="xs:string" minOccurs="1" maxOccurs="1"/>
+ <xs:element name="name-node" type="xs:string" minOccurs="1" maxOccurs="1"/>
+ <xs:element name="prepare" type="workflow:PREPARE" minOccurs="0" maxOccurs="1"/>
+ <xs:choice minOccurs="0" maxOccurs="1">
+ <xs:element name="streaming" type="workflow:STREAMING" minOccurs="0" maxOccurs="1"/>
+ <xs:element name="pipes" type="workflow:PIPES" minOccurs="0" maxOccurs="1"/>
+ </xs:choice>
+ <xs:element name="job-xml" type="xs:string" minOccurs="0" maxOccurs="1"/>
+ <xs:element name="configuration" type="workflow:CONFIGURATION" minOccurs="0" maxOccurs="1"/>
+ <xs:element name="file" type="xs:string" minOccurs="0" maxOccurs="unbounded"/>
+ <xs:element name="archive" type="xs:string" minOccurs="0" maxOccurs="unbounded"/>
+ </xs:sequence>
+ </xs:complexType>
+
+ <xs:complexType name="PIG">
+ <xs:sequence>
+ <xs:element name="job-tracker" type="xs:string" minOccurs="1" maxOccurs="1"/>
+ <xs:element name="name-node" type="xs:string" minOccurs="1" maxOccurs="1"/>
+ <xs:element name="prepare" type="workflow:PREPARE" minOccurs="0" maxOccurs="1"/>
+ <xs:element name="job-xml" type="xs:string" minOccurs="0" maxOccurs="1"/>
+ <xs:element name="configuration" type="workflow:CONFIGURATION" minOccurs="0" maxOccurs="1"/>
+ <xs:element name="script" type="xs:string" minOccurs="1" maxOccurs="1"/>
+ <xs:element name="param" type="xs:string" minOccurs="0" maxOccurs="unbounded"/>
+ <xs:element name="argument" type="xs:string" minOccurs="0" maxOccurs="unbounded"/>
+ <xs:element name="file" type="xs:string" minOccurs="0" maxOccurs="unbounded"/>
+ <xs:element name="archive" type="xs:string" minOccurs="0" maxOccurs="unbounded"/>
+ </xs:sequence>
+ </xs:complexType>
+
+ <xs:complexType name="SUB-WORKFLOW">
+ <xs:sequence>
+ <xs:element name="app-path" type="xs:string" minOccurs="1" maxOccurs="1"/>
+ <xs:element name="propagate-configuration" type="workflow:FLAG" minOccurs="0" maxOccurs="1"/>
+ <xs:element name="configuration" type="workflow:CONFIGURATION" minOccurs="0" maxOccurs="1"/>
+ </xs:sequence>
+ </xs:complexType>
+
+ <xs:complexType name="FS">
+ <xs:sequence>
+ <xs:element name="delete" type="workflow:DELETE" minOccurs="0" maxOccurs="unbounded"/>
+ <xs:element name="mkdir" type="workflow:MKDIR" minOccurs="0" maxOccurs="unbounded"/>
+ <xs:element name="move" type="workflow:MOVE" minOccurs="0" maxOccurs="unbounded"/>
+ <xs:element name="chmod" type="workflow:CHMOD" minOccurs="0" maxOccurs="unbounded"/>
+ </xs:sequence>
+ </xs:complexType>
+
+ <xs:complexType name="JAVA">
+ <xs:sequence>
+ <xs:element name="job-tracker" type="xs:string" minOccurs="1" maxOccurs="1"/>
+ <xs:element name="name-node" type="xs:string" minOccurs="1" maxOccurs="1"/>
+ <xs:element name="prepare" type="workflow:PREPARE" minOccurs="0" maxOccurs="1"/>
+ <xs:element name="job-xml" type="xs:string" minOccurs="0" maxOccurs="1"/>
+ <xs:element name="configuration" type="workflow:CONFIGURATION" minOccurs="0" maxOccurs="1"/>
+ <xs:element name="main-class" type="xs:string" minOccurs="1" maxOccurs="1"/>
+ <xs:element name="java-opts" type="xs:string" minOccurs="0" maxOccurs="1"/>
+ <xs:element name="arg" type="xs:string" minOccurs="0" maxOccurs="unbounded"/>
+ <xs:element name="file" type="xs:string" minOccurs="0" maxOccurs="unbounded"/>
+ <xs:element name="archive" type="xs:string" minOccurs="0" maxOccurs="unbounded"/>
+ <xs:element name="capture-output" type="workflow:FLAG" minOccurs="0" maxOccurs="1"/>
+ </xs:sequence>
+ </xs:complexType>
+
+ <xs:complexType name="FLAG"/>
+
+ <xs:complexType name="CONFIGURATION">
+ <xs:sequence>
+ <xs:element name="property" minOccurs="1" maxOccurs="unbounded">
+ <xs:complexType>
+ <xs:sequence>
+ <xs:element name="name" minOccurs="1" maxOccurs="1" type="xs:string"/>
+ <xs:element name="value" minOccurs="1" maxOccurs="1" type="xs:string"/>
+ <xs:element name="description" minOccurs="0" maxOccurs="1" type="xs:string"/>
+ </xs:sequence>
+ </xs:complexType>
+ </xs:element>
+ </xs:sequence>
+ </xs:complexType>
+
+ <xs:complexType name="STREAMING">
+ <xs:sequence>
+ <xs:element name="mapper" type="xs:string" minOccurs="0" maxOccurs="1"/>
+ <xs:element name="reducer" type="xs:string" minOccurs="0" maxOccurs="1"/>
+ <xs:element name="record-reader" type="xs:string" minOccurs="0" maxOccurs="1"/>
+ <xs:element name="record-reader-mapping" type="xs:string" minOccurs="0" maxOccurs="unbounded"/>
+ <xs:element name="env" type="xs:string" minOccurs="0" maxOccurs="unbounded"/>
+ </xs:sequence>
+ </xs:complexType>
+
+ <xs:complexType name="PIPES">
+ <xs:sequence>
+ <xs:element name="map" type="xs:string" minOccurs="0" maxOccurs="1"/>
+ <xs:element name="reduce" type="xs:string" minOccurs="0" maxOccurs="1"/>
+ <xs:element name="inputformat" type="xs:string" minOccurs="0" maxOccurs="1"/>
+ <xs:element name="partitioner" type="xs:string" minOccurs="0" maxOccurs="1"/>
+ <xs:element name="writer" type="xs:string" minOccurs="0" maxOccurs="1"/>
+ <xs:element name="program" type="xs:string" minOccurs="0" maxOccurs="1"/>
+ </xs:sequence>
+ </xs:complexType>
+
+ <xs:complexType name="PREPARE">
+ <xs:sequence>
+ <xs:element name="delete" type="workflow:DELETE" minOccurs="0" maxOccurs="unbounded"/>
+ <xs:element name="mkdir" type="workflow:MKDIR" minOccurs="0" maxOccurs="unbounded"/>
+ </xs:sequence>
+ </xs:complexType>
+
+ <xs:complexType name="DELETE">
+ <xs:attribute name="path" type="xs:string" use="required"/>
+ </xs:complexType>
+
+ <xs:complexType name="MKDIR">
+ <xs:attribute name="path" type="xs:string" use="required"/>
+ </xs:complexType>
+
+ <xs:complexType name="MOVE">
+ <xs:attribute name="source" type="xs:string" use="required"/>
+ <xs:attribute name="target" type="xs:string" use="required"/>
+ </xs:complexType>
+
+ <xs:complexType name="CHMOD">
+ <xs:attribute name="path" type="xs:string" use="required"/>
+ <xs:attribute name="permissions" type="xs:string" use="required"/>
+ <xs:attribute name="dir-files" type="xs:string"/>
+ </xs:complexType>
+
+ <xs:complexType name="CREDENTIALS">
+ <xs:sequence minOccurs="0" maxOccurs="unbounded">
+ <xs:element name="credential" type="workflow:CREDENTIAL"/>
+ </xs:sequence>
+ </xs:complexType>
+
+ <xs:complexType name="CREDENTIAL">
+ <xs:sequence minOccurs="0" maxOccurs="unbounded" >
+ <xs:element name="property" minOccurs="1" maxOccurs="unbounded">
+ <xs:complexType>
+ <xs:sequence>
+ <xs:element name="name" minOccurs="1" maxOccurs="1" type="xs:string"/>
+ <xs:element name="value" minOccurs="1" maxOccurs="1" type="xs:string"/>
+ <xs:element name="description" minOccurs="0" maxOccurs="1" type="xs:string"/>
+ </xs:sequence>
+ </xs:complexType>
+ </xs:element>
+ </xs:sequence>
+ <xs:attribute name="name" type="xs:string" use="required"/>
+ <xs:attribute name="type" type="xs:string" use="required"/>
+ </xs:complexType>
+</xs:schema>
View
5 core/src/main/java/org/apache/oozie/ErrorCode.java
@@ -143,7 +143,8 @@
E0816(XLog.STD, "Action pending=[{0}], status=[{1}]. Skipping ActionStart Execution"),
E0817(XLog.STD, "The wf action [{0}] has been udated recently. Ignoring ActionCheck."),
E0818(XLog.STD, "Action [{0}] status is running but WF Job [{1}] status is [{2}]. Expected status is RUNNING."),
- E0819(XLog.STD, "Unable to delete the temp dir of job WF Job [{1}]."),
+ E0819(XLog.STD, "Unable to delete the temp dir of job WF Job [{0}]."),
+ E0820(XLog.STD, "Action user retry max [{0}] is over system defined max [{1}], re-assign to use system max."),
E0900(XLog.OPS, "Jobtracker [{0}] not allowed, not in Oozie's whitelist"),
E0901(XLog.OPS, "Namenode [{0}] not allowed, not in Oozie's whitelist"),
@@ -171,7 +172,7 @@
E1020(XLog.STD, "Could not kill coord job, this job either finished successfully or does not exist , [{0}]"),
E1021(XLog.STD, "Coord Action Input Check Error: {0}"),
- E1100(XLog.STD, "Command precondition does not hold before execution"),
+ E1100(XLog.STD, "Command precondition does not hold before execution, [{0}]"),
E1101(XLog.STD, "SLA Nominal time is required."),
E1102(XLog.STD, "SLA should-start can't be empty."),
View
15 core/src/main/java/org/apache/oozie/WorkflowActionBean.java
@@ -158,6 +158,9 @@ public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeLong((pendingAge != null) ? pendingAge.getTime() : -1);
WritableUtils.writeStr(dataOutput, signalValue);
WritableUtils.writeStr(dataOutput, logToken);
+ dataOutput.writeInt(getUserRetryCount());
+ dataOutput.writeInt(getUserRetryInterval());
+ dataOutput.writeInt(getUserRetryMax());
}
/**
@@ -203,6 +206,9 @@ public void readFields(DataInput dataInput) throws IOException {
}
signalValue = WritableUtils.readStr(dataInput);
logToken = WritableUtils.readStr(dataInput);
+ setUserRetryCount(dataInput.readInt());
+ setUserRetryInterval(dataInput.readInt());
+ setUserRetryMax(dataInput.readInt());
}
/**
@@ -225,6 +231,15 @@ public boolean isRetryOrManual() {
return (getStatus() == WorkflowAction.Status.START_RETRY || getStatus() == WorkflowAction.Status.START_MANUAL
|| getStatus() == WorkflowAction.Status.END_RETRY || getStatus() == WorkflowAction.Status.END_MANUAL);
}
+
+ /**
+ * Return true if the action is USER_RETRY
+ *
+ * @return boolean true if status is USER_RETRY
+ */
+ public boolean isUserRetry() {
+ return (getStatus() == WorkflowAction.Status.USER_RETRY);
+ }
/**
* Return if the action is complete.
View
6 core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
@@ -43,6 +43,7 @@
import org.apache.hadoop.mapred.JobID;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.util.DiskChecker;
+import org.apache.oozie.WorkflowActionBean;
import org.apache.oozie.WorkflowJobBean;
import org.apache.oozie.action.ActionExecutor;
import org.apache.oozie.action.ActionExecutorException;
@@ -536,7 +537,10 @@ public void submitLauncher(FileSystem actionFs, Context context, WorkflowAction
boolean alreadyRunning = launcherId != null;
RunningJob runningJob;
- if (alreadyRunning) {
+ // if user-retry is on, always submit new launcher
+ boolean isUserRetry = ((WorkflowActionBean)action).isUserRetry();
+
+ if (alreadyRunning && !isUserRetry) {
runningJob = jobClient.getJob(JobID.forName(launcherId));
if (runningJob == null) {
String jobTracker = launcherJobConf.get("mapred.job.tracker");
View
40 core/src/main/java/org/apache/oozie/client/rest/JsonWorkflowAction.java
@@ -58,6 +58,18 @@
@Basic
@Column(name = "retries")
private int retries;
+
+ @Basic
+ @Column(name = "user_retry_count")
+ private int userRetryCount;
+
+ @Basic
+ @Column(name = "user_retry_max")
+ private int userRetryMax;
+
+ @Basic
+ @Column(name = "user_retry_interval")
+ private int userRetryInterval;
@Transient
private Date startTime;
@@ -179,6 +191,34 @@ public int getRetries() {
public void setRetries(int retries) {
this.retries = retries;
}
+
+ public int getUserRetryCount() {
+ return userRetryCount;
+ }
+
+ public void setUserRetryCount(int retryCount) {
+ this.userRetryCount = retryCount;
+ }
+
+ public void incrmentUserRetryCount() {
+ this.userRetryCount++;
+ }
+
+ public int getUserRetryMax() {
+ return userRetryMax;
+ }
+
+ public void setUserRetryMax(int retryMax) {
+ this.userRetryMax = retryMax;
+ }
+
+ public int getUserRetryInterval() {
+ return userRetryInterval;
+ }
+
+ public void setUserRetryInterval(int retryInterval) {
+ this.userRetryInterval = retryInterval;
+ }
public Date getStartTime() {
return startTime;
View
21 core/src/main/java/org/apache/oozie/command/wf/ActionCheckXCommand.java
@@ -16,7 +16,6 @@
import java.sql.Timestamp;
import java.util.Date;
-
import org.apache.oozie.ErrorCode;
import org.apache.oozie.WorkflowActionBean;
import org.apache.oozie.WorkflowJobBean;
@@ -151,7 +150,8 @@ protected Void execute() throws CommandException {
ActionExecutorContext context = null;
try {
boolean isRetry = false;
- context = new ActionXCommand.ActionExecutorContext(wfJob, wfAction, isRetry);
+ boolean isUserRetry = false;
+ context = new ActionXCommand.ActionExecutorContext(wfJob, wfAction, isRetry, isUserRetry);
incrActionCounter(wfAction.getType(), 1);
Instrumentation.Cron cron = new Instrumentation.Cron();
@@ -183,10 +183,15 @@ protected Void execute() throws CommandException {
LOG.warn("Exception while executing check(). Error Code [{0}], Message[{1}]", ex.getErrorCode(), ex
.getMessage(), ex);
+ wfAction.setErrorInfo(ex.getErrorCode(), ex.getMessage());
+
switch (ex.getErrorType()) {
case FAILED:
failAction(wfJob, wfAction);
break;
+ case ERROR:
+ handleUserRetry(wfAction);
+ break;
}
wfAction.setLastCheckTime(new Date());
try {
@@ -207,10 +212,12 @@ protected Void execute() throws CommandException {
}
private void failAction(WorkflowJobBean workflow, WorkflowActionBean action) throws CommandException {
- LOG.warn("Failing Job [{0}] due to failed action [{1}]", workflow.getId(), action.getId());
- action.resetPending();
- action.setStatus(Status.FAILED);
- workflow.setStatus(WorkflowJob.Status.FAILED);
- InstrumentUtils.incrJobCounter(INSTR_FAILED_JOBS_COUNTER, 1, getInstrumentation());
+ if (!handleUserRetry(action)) {
+ LOG.warn("Failing Job [{0}] due to failed action [{1}]", workflow.getId(), action.getId());
+ action.resetPending();
+ action.setStatus(Status.FAILED);
+ workflow.setStatus(WorkflowJob.Status.FAILED);
+ InstrumentUtils.incrJobCounter(INSTR_FAILED_JOBS_COUNTER, 1, getInstrumentation());
+ }
}
}
View
26 core/src/main/java/org/apache/oozie/command/wf/ActionEndXCommand.java
@@ -133,7 +133,8 @@ protected Void execute() throws CommandException {
|| wfAction.getStatus() == WorkflowActionBean.Status.END_MANUAL) {
isRetry = true;
}
- ActionExecutorContext context = new ActionXCommand.ActionExecutorContext(wfJob, wfAction, isRetry);
+ boolean isUserRetry = false;
+ ActionExecutorContext context = new ActionXCommand.ActionExecutorContext(wfJob, wfAction, isRetry, isUserRetry);
try {
LOG.debug(
@@ -163,9 +164,8 @@ protected Void execute() throws CommandException {
}
wfAction.setRetries(0);
wfAction.setEndTime(new Date());
- jpaService.execute(new WorkflowActionUpdateJPAExecutor(wfAction));
- jpaService.execute(new WorkflowJobUpdateJPAExecutor(wfJob));
+ boolean shouldHandleUserRetry = false;
Status slaStatus = null;
switch (wfAction.getStatus()) {
case OK:
@@ -176,21 +176,29 @@ protected Void execute() throws CommandException {
break;
case FAILED:
slaStatus = Status.FAILED;
+ shouldHandleUserRetry = true;
break;
case ERROR:
LOG.info("ERROR is considered as FAILED for SLA");
slaStatus = Status.KILLED;
+ shouldHandleUserRetry = true;
break;
default:
slaStatus = Status.FAILED;
+ shouldHandleUserRetry = true;
break;
}
- SLADbXOperations.writeStausEvent(wfAction.getSlaXml(), wfAction.getId(), slaStatus, SlaAppType.WORKFLOW_ACTION);
- queue(new NotificationXCommand(wfJob, wfAction));
- LOG.debug(
- "Queuing commands for action=" + actionId + ", status=" + wfAction.getStatus()
- + ", Set pending=" + wfAction.getPending());
- queue(new SignalXCommand(jobId, actionId));
+ if (!shouldHandleUserRetry || !handleUserRetry(wfAction)) {
+ SLADbXOperations.writeStausEvent(wfAction.getSlaXml(), wfAction.getId(), slaStatus, SlaAppType.WORKFLOW_ACTION);
+ queue(new NotificationXCommand(wfJob, wfAction));
+ LOG.debug(
+ "Queuing commands for action=" + actionId + ", status=" + wfAction.getStatus()
+ + ", Set pending=" + wfAction.getPending());
+ queue(new SignalXCommand(jobId, actionId));
+ }
+
+ jpaService.execute(new WorkflowActionUpdateJPAExecutor(wfAction));
+ jpaService.execute(new WorkflowJobUpdateJPAExecutor(wfJob));
}
catch (ActionExecutorException ex) {
LOG.warn(
View
3  core/src/main/java/org/apache/oozie/command/wf/ActionKillXCommand.java
@@ -104,8 +104,9 @@ protected Void execute() throws CommandException {
if (executor != null) {
try {
boolean isRetry = false;
+ boolean isUserRetry = false;
ActionExecutorContext context = new ActionXCommand.ActionExecutorContext(wfJob, wfAction,
- isRetry);
+ isRetry, isUserRetry);
incrActionCounter(wfAction.getType(), 1);
Instrumentation.Cron cron = new Instrumentation.Cron();
View
25 core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java
@@ -109,7 +109,10 @@ protected void verifyPrecondition() throws CommandException, PreconditionExcepti
}
if (wfAction.isPending()
&& (wfAction.getStatus() == WorkflowActionBean.Status.PREP
- || wfAction.getStatus() == WorkflowActionBean.Status.START_RETRY || wfAction.getStatus() == WorkflowActionBean.Status.START_MANUAL)) {
+ || wfAction.getStatus() == WorkflowActionBean.Status.START_RETRY
+ || wfAction.getStatus() == WorkflowActionBean.Status.START_MANUAL
+ || wfAction.getStatus() == WorkflowActionBean.Status.USER_RETRY
+ )) {
if (wfJob.getStatus() != WorkflowJob.Status.RUNNING) {
throw new PreconditionException(ErrorCode.E0810, WorkflowJob.Status.RUNNING.toString());
}
@@ -142,7 +145,11 @@ protected Void execute() throws CommandException {
|| wfAction.getStatus() == WorkflowActionBean.Status.START_MANUAL) {
isRetry = true;
}
- context = new ActionXCommand.ActionExecutorContext(wfJob, wfAction, isRetry);
+ boolean isUserRetry = false;
+ if (wfAction.getStatus() == WorkflowActionBean.Status.USER_RETRY) {
+ isUserRetry = true;
+ }
+ context = new ActionXCommand.ActionExecutorContext(wfJob, wfAction, isRetry, isUserRetry);
try {
String tmpActionConf = XmlUtils.removeComments(wfAction.getConf());
String actionConf = context.getELEvaluator().evaluate(tmpActionConf, String.class);
@@ -177,6 +184,10 @@ protected Void execute() throws CommandException {
wfAction.setErrorInfo(null, null);
incrActionCounter(wfAction.getType(), 1);
+ LOG.info("Start action [{0}] with user-retry state : userRetryCount [{1}], userRetryMax [{2}], userRetryInterval [{3}]",
+ wfAction.getId(), wfAction.getUserRetryCount(), wfAction.getUserRetryMax(), wfAction
+ .getUserRetryInterval());
+
Instrumentation.Cron cron = new Instrumentation.Cron();
cron.start();
executor.start(context, wfAction);
@@ -247,7 +258,7 @@ protected Void execute() throws CommandException {
failJob(context);
// update coordinator action
new CoordActionUpdateXCommand(wfJob, 3).call();
- new WfEndXCommand(wfJob).call(); //To delete the WF temp dir
+ new WfEndXCommand(wfJob).call(); // To delete the WF temp dir
SLADbXOperations.writeStausEvent(wfAction.getSlaXml(), wfAction.getId(), Status.FAILED,
SlaAppType.WORKFLOW_ACTION);
SLADbXOperations.writeStausEvent(wfJob.getSlaXml(), wfJob.getId(), Status.FAILED,
@@ -293,4 +304,12 @@ private void handleError(ActionExecutorContext context, WorkflowJobBean workflow
return;
}
+ /* (non-Javadoc)
+ * @see org.apache.oozie.command.XCommand#getKey()
+ */
+ @Override
+ public String getKey(){
+ return getName() + "_" + actionId;
+ }
+
}
View
87 core/src/main/java/org/apache/oozie/command/wf/ActionXCommand.java
@@ -20,6 +20,7 @@
import java.net.URISyntaxException;
import java.util.Date;
import java.util.Properties;
+import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -38,6 +39,7 @@
import org.apache.oozie.service.HadoopAccessorException;
import org.apache.oozie.service.HadoopAccessorService;
import org.apache.oozie.service.JPAService;
+import org.apache.oozie.service.LiteWorkflowStoreService;
import org.apache.oozie.service.Services;
import org.apache.oozie.util.ELEvaluator;
import org.apache.oozie.util.InstrumentUtils;
@@ -146,14 +148,17 @@ protected void handleError(ActionExecutor.Context context, ActionExecutor execut
LOG.warn("Setting Action Status to [{0}]", status);
ActionExecutorContext aContext = (ActionExecutorContext) context;
WorkflowActionBean action = (WorkflowActionBean) aContext.getAction();
- incrActionErrorCounter(action.getType(), "error", 1);
- action.setPending();
- if (isStart) {
- action.setExecutionData(message, null);
- queue(new ActionEndXCommand(action.getId(), action.getType()));
- }
- else {
- action.setEndData(status, WorkflowAction.Status.ERROR.toString());
+
+ if (!handleUserRetry(action)) {
+ incrActionErrorCounter(action.getType(), "error", 1);
+ action.setPending();
+ if (isStart) {
+ action.setExecutionData(message, null);
+ queue(new ActionEndXCommand(action.getId(), action.getType()));
+ }
+ else {
+ action.setEndData(status, WorkflowAction.Status.ERROR.toString());
+ }
}
}
@@ -166,24 +171,52 @@ protected void handleError(ActionExecutor.Context context, ActionExecutor execut
public void failJob(ActionExecutor.Context context) throws CommandException {
ActionExecutorContext aContext = (ActionExecutorContext) context;
WorkflowActionBean action = (WorkflowActionBean) aContext.getAction();
- incrActionErrorCounter(action.getType(), "failed", 1);
WorkflowJobBean workflow = (WorkflowJobBean) context.getWorkflow();
- LOG.warn("Failing Job due to failed action [{0}]", action.getName());
- try {
- workflow.getWorkflowInstance().fail(action.getName());
- WorkflowInstance wfInstance = workflow.getWorkflowInstance();
- ((LiteWorkflowInstance) wfInstance).setStatus(WorkflowInstance.Status.FAILED);
- workflow.setWorkflowInstance(wfInstance);
- workflow.setStatus(WorkflowJob.Status.FAILED);
- action.setStatus(WorkflowAction.Status.FAILED);
- action.resetPending();
- queue(new NotificationXCommand(workflow, action));
- queue(new KillXCommand(workflow.getId()));
- InstrumentUtils.incrJobCounter(INSTR_FAILED_JOBS_COUNTER, 1, getInstrumentation());
+
+ if (!handleUserRetry(action)) {
+ incrActionErrorCounter(action.getType(), "failed", 1);
+ LOG.warn("Failing Job due to failed action [{0}]", action.getName());
+ try {
+ workflow.getWorkflowInstance().fail(action.getName());
+ WorkflowInstance wfInstance = workflow.getWorkflowInstance();
+ ((LiteWorkflowInstance) wfInstance).setStatus(WorkflowInstance.Status.FAILED);
+ workflow.setWorkflowInstance(wfInstance);
+ workflow.setStatus(WorkflowJob.Status.FAILED);
+ action.setStatus(WorkflowAction.Status.FAILED);
+ action.resetPending();
+ queue(new NotificationXCommand(workflow, action));
+ queue(new KillXCommand(workflow.getId()));
+ InstrumentUtils.incrJobCounter(INSTR_FAILED_JOBS_COUNTER, 1, getInstrumentation());
+ }
+ catch (WorkflowException ex) {
+ throw new CommandException(ex);
+ }
}
- catch (WorkflowException ex) {
- throw new CommandException(ex);
+ }
+
+ /**
+ * Execute retry for action if this action is eligible for user-retry
+ *
+ * @param context the execution context.
+ * @return true if user-retry has to be handled for this action
+ * @throws CommandException thrown if unable to fail job
+ */
+ public boolean handleUserRetry(WorkflowActionBean action) throws CommandException {
+ String errorCode = action.getErrorCode();
+ Set<String> allowedRetryCode = LiteWorkflowStoreService.getUserRetryErrorCode();
+
+ if (allowedRetryCode.contains(errorCode) && action.getUserRetryCount() < action.getUserRetryMax()) {
+ LOG.info("Preparing retry this action [{0}], errorCode [{1}], userRetryCount [{2}], "
+ + "userRetryMax [{3}], userRetryInterval [{4}]", action.getId(), errorCode, action
+ .getUserRetryCount(), action.getUserRetryMax(), action.getUserRetryInterval());
+ int interval = action.getUserRetryInterval() * 60 * 1000;
+ action.setStatus(WorkflowAction.Status.USER_RETRY);
+ action.incrmentUserRetryCount();
+ action.setPending();
+ queue(new ActionStartXCommand(action.getId(), action.getType()), interval);
+ return true;
}
+ return false;
}
private void incrActionErrorCounter(String type, String error, int count) {
@@ -207,14 +240,16 @@ protected void addActionCron(String type, Instrumentation.Cron cron) {
private Configuration protoConf;
private final WorkflowActionBean action;
private final boolean isRetry;
+ private final boolean isUserRetry;
private boolean started;
private boolean ended;
private boolean executed;
- public ActionExecutorContext(WorkflowJobBean workflow, WorkflowActionBean action, boolean isRetry) {
+ public ActionExecutorContext(WorkflowJobBean workflow, WorkflowActionBean action, boolean isRetry, boolean isUserRetry) {
this.workflow = workflow;
this.action = action;
this.isRetry = isRetry;
+ this.isUserRetry = isUserRetry;
try {
protoConf = new XConfiguration(new StringReader(workflow.getProtoActionConf()));
}
@@ -276,6 +311,10 @@ public boolean isRetry() {
return isRetry;
}
+ public boolean isUserRetry() {
+ return isUserRetry;
+ }
+
/**
* Returns whether setStartData has been called or not.
*
View
10 core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java
@@ -228,15 +228,19 @@ protected Void execute() throws CommandException {
if (wfAction != null) { // wfAction could be a no-op job
NodeDef nodeDef = workflowInstance.getNodeDef(wfAction.getExecutionPath());
if (nodeDef instanceof KillNodeDef) {
- ActionExecutorContext context = new ActionXCommand.ActionExecutorContext(wfJob, wfAction, false);
+ boolean isRetry = false;
+ boolean isUserRetry = false;
+ ActionExecutorContext context = new ActionXCommand.ActionExecutorContext(wfJob, wfAction, isRetry,
+ isUserRetry);
try {
String tmpNodeConf = nodeDef.getConf();
String actionConf = context.getELEvaluator().evaluate(tmpNodeConf, String.class);
LOG.debug("Try to resolve KillNode message for jobid [{0}], actionId [{1}], before resolve [{2}], after resolve [{3}]",
- jobId, actionId, tmpNodeConf, actionConf);
+ jobId, actionId, tmpNodeConf, actionConf);
if (wfAction.getErrorCode() != null) {
wfAction.setErrorInfo(wfAction.getErrorCode(), actionConf);
- } else {
+ }
+ else {
wfAction.setErrorInfo(ErrorCode.E0729.toString(), actionConf);
}
jpaService.execute(new WorkflowActionUpdateJPAExecutor(wfAction));
View
3  core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionGetJPAExecutor.java
@@ -98,6 +98,9 @@ private WorkflowActionBean getBeanForRunningAction(WorkflowActionBean a) {
action.setStartTime(a.getStartTime());
action.setStatus(a.getStatus());
action.setJobId(a.getWfId());
+ action.setUserRetryCount(a.getUserRetryCount());
+ action.setUserRetryInterval(a.getUserRetryInterval());
+ action.setUserRetryMax(a.getUserRetryMax());
return action;
}
return null;
View
3  core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionsGetForJobJPAExecutor.java
@@ -97,6 +97,9 @@ private WorkflowActionBean getBeanForRunningAction(WorkflowActionBean a) throws
action.setStartTime(a.getStartTime());
action.setStatus(a.getStatus());
action.setJobId(a.getWfId());
+ action.setUserRetryCount(a.getUserRetryCount());
+ action.setUserRetryInterval(a.getUserRetryInterval());
+ action.setUserRetryMax(a.getUserRetryMax());
return action;
}
return null;
View
3  core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionsRunningGetJPAExecutor.java
@@ -102,6 +102,9 @@ private WorkflowActionBean getBeanForRunningAction(WorkflowActionBean bean){
action.setStartTime(bean.getStartTime());
action.setStatus(bean.getStatus());
action.setJobId(bean.getWfId());
+ action.setUserRetryCount(bean.getUserRetryCount());
+ action.setUserRetryInterval(bean.getUserRetryInterval());
+ action.setUserRetryMax(bean.getUserRetryMax());
return action;
}
return null;
View
3  core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobGetActionsJPAExecutor.java
@@ -91,6 +91,9 @@ private WorkflowActionBean getBeanForRunningAction(WorkflowActionBean a) throws
action.setStartTime(a.getStartTime());
action.setStatus(a.getStatus());
action.setJobId(a.getWfId());
+ action.setUserRetryCount(a.getUserRetryCount());
+ action.setUserRetryInterval(a.getUserRetryInterval());
+ action.setUserRetryMax(a.getUserRetryMax());
return action;
}
return null;
View
102 core/src/main/java/org/apache/oozie/service/LiteWorkflowStoreService.java
@@ -14,6 +14,7 @@
*/
package org.apache.oozie.service;
+import org.apache.hadoop.conf.Configuration;
import org.apache.oozie.command.wf.ReRunXCommand;
import org.apache.oozie.client.WorkflowAction;
@@ -22,7 +23,6 @@
import org.apache.oozie.ErrorCode;
import org.apache.oozie.workflow.WorkflowException;
import org.apache.oozie.workflow.WorkflowInstance;
-import org.apache.oozie.workflow.lite.ActionNodeDef;
import org.apache.oozie.workflow.lite.ActionNodeHandler;
import org.apache.oozie.workflow.lite.DecisionNodeHandler;
import org.apache.oozie.workflow.lite.NodeHandler;
@@ -32,10 +32,24 @@
import org.jdom.JDOMException;
import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
public abstract class LiteWorkflowStoreService extends WorkflowStoreService {
+ public static final String CONF_PREFIX = Service.CONF_PREFIX + "LiteWorkflowStoreService.";
+ public static final String CONF_PREFIX_USER_RETRY = CONF_PREFIX + "user.retry.";
+ public static final String CONF_USER_RETRY_MAX = CONF_PREFIX_USER_RETRY + "max";
+ public static final String CONF_USER_RETRY_INTEVAL = CONF_PREFIX_USER_RETRY + "inteval";
+ public static final String CONF_USER_RETRY_ERROR_CODE = CONF_PREFIX_USER_RETRY + "error.code";
+ public static final String CONF_USER_RETRY_ERROR_CODE_EXT = CONF_PREFIX_USER_RETRY + "error.code.ext";
+
+ public static final String NODE_DEF_VERSION_0 = "_oozie_inst_v_0";
+ public static final String NODE_DEF_VERSION_1 = "_oozie_inst_v_1";
+ public static final String CONF_NODE_DEF_VERSION = CONF_PREFIX + "node.def.version";
+
/**
* Delegation method used by the Action and Decision {@link NodeHandler} on start. <p/> This method provides the
* necessary information to create ActionExecutors.
@@ -56,7 +70,7 @@ protected static void liteExecute(NodeHandler.Context context) throws WorkflowEx
}
WorkflowActionBean action = new WorkflowActionBean();
String actionId = Services.get().get(UUIDService.class).generateChildId(jobId, nodeName);
-
+
if (!skipAction) {
String nodeConf = context.getNodeDef().getConf();
String executionPath = context.getExecutionPath();
@@ -80,7 +94,18 @@ protected static void liteExecute(NodeHandler.Context context) throws WorkflowEx
action.setJobId(jobId);
}
action.setCred(context.getNodeDef().getCred());
- log.debug("liteExecute: Setting the Auth type for action "+context.getNodeDef().getCred() + " Name: "+context.getNodeDef().getName());
+ log.debug("Setting action for cred: '"+context.getNodeDef().getCred() +
+ "', name: '"+ context.getNodeDef().getName() + "'");
+
+ action.setUserRetryCount(0);
+ int userRetryMax = getUserRetryMax(context);
+ int userRetryInterval = getUserRetryInterval(context);
+ action.setUserRetryMax(userRetryMax);
+ action.setUserRetryInterval(userRetryInterval);
+ log.debug("Setting action for userRetryMax: '"+ userRetryMax +
+ "', userRetryInterval: '" + userRetryInterval +
+ "', name: '"+ context.getNodeDef().getName() + "'");
+
action.setName(nodeName);
action.setId(actionId);
context.setVar(nodeName + WorkflowInstance.NODE_VAR_SEPARATOR + ACTION_ID, actionId);
@@ -92,6 +117,77 @@ protected static void liteExecute(NodeHandler.Context context) throws WorkflowEx
list.add(action);
}
+ private static int getUserRetryInterval(NodeHandler.Context context) throws WorkflowException {
+ Configuration conf = Services.get().get(ConfigurationService.class).getConf();
+ int ret = conf.getInt(CONF_USER_RETRY_INTEVAL, 5);
+ String userRetryInterval = context.getNodeDef().getUserRetryInterval();
+
+ if (!userRetryInterval.equals("null")) {
+ try {
+ ret = Integer.parseInt(userRetryInterval);
+ }
+ catch (NumberFormatException nfe) {
+ throw new WorkflowException(ErrorCode.E0700, nfe.getMessage(), nfe);
+ }
+ }
+ return ret;
+ }
+
+ private static int getUserRetryMax(NodeHandler.Context context) throws WorkflowException {
+ XLog log = XLog.getLog(LiteWorkflowStoreService.class);
+ Configuration conf = Services.get().get(ConfigurationService.class).getConf();
+ int ret = conf.getInt(CONF_USER_RETRY_MAX, 0);
+ int max = ret;
+ String userRetryMax = context.getNodeDef().getUserRetryMax();
+
+ if (!userRetryMax.equals("null")) {
+ try {
+ ret = Integer.parseInt(userRetryMax);
+ if (ret > max) {
+ ret = max;
+ log.warn(ErrorCode.E0820.getTemplate(), ret, max);
+ }
+ }
+ catch (NumberFormatException nfe) {
+ throw new WorkflowException(ErrorCode.E0700, nfe.getMessage(), nfe);
+ }
+ }
+ else {
+ ret = 0;
+ }
+ return ret;
+ }
+
+ /**
+ * Get system defined and instance defined error codes for which USER_RETRY is allowed
+ *
+ * @return set of error code user-retry is allowed for
+ */
+ public static Set<String> getUserRetryErrorCode() {
+ Configuration conf = Services.get().get(ConfigurationService.class).getConf();
+ Collection<String> strings = (Collection<String>) conf.getStringCollection(CONF_USER_RETRY_ERROR_CODE);
+ Collection<String> extra = (Collection<String>) conf.getStringCollection(CONF_USER_RETRY_ERROR_CODE_EXT);
+ Set<String> set = new HashSet<String>();
+ set.addAll(strings);
+ set.addAll(extra);
+ return set;
+ }
+
+ /**
+ * Get NodeDef default version, _oozie_inst_v_0 or _oozie_inst_v_1
+ *
+ * @return nodedef default version
+ * @throws WorkflowException thrown if there was an error parsing the action configuration.
+ */
+ public static String getNodeDefDefaultVersion() throws WorkflowException {
+ Configuration conf = Services.get().get(ConfigurationService.class).getConf();
+ String ret = conf.get(CONF_NODE_DEF_VERSION);
+ if (ret == null) {
+ ret = NODE_DEF_VERSION_1;
+ }
+ return ret;
+ }
+
/**
* Delegation method used when failing actions. <p/>
*
View
3  core/src/main/java/org/apache/oozie/service/RecoveryService.java
@@ -365,6 +365,9 @@ else if (action.getStatus() == WorkflowActionBean.Status.OK
}
}
+ else if (action.getStatus() == WorkflowActionBean.Status.USER_RETRY) {
+ queueCallable(new ActionStartXCommand(action.getId(), action.getType()));
+ }
}
}
catch (Exception ex) {
View
7 core/src/main/java/org/apache/oozie/service/SchemaService.java
@@ -56,8 +56,11 @@
private Schema slaSchema;
- private static final String OOZIE_WORKFLOW_XSD[] = { "oozie-workflow-0.1.xsd", "oozie-workflow-0.2.xsd",
- "oozie-workflow-0.2.5.xsd" };
+ private static final String OOZIE_WORKFLOW_XSD[] = {
+ "oozie-workflow-0.1.xsd",
+ "oozie-workflow-0.2.xsd",
+ "oozie-workflow-0.2.5.xsd",
+ "oozie-workflow-0.3.xsd"};
private static final String OOZIE_COORDINATOR_XSD[] = { "oozie-coordinator-0.1.xsd", "oozie-coordinator-0.2.xsd", "oozie-coordinator-0.3.xsd"};
private static final String OOZIE_BUNDLE_XSD[] = { "oozie-bundle-0.1.xsd" };
private static final String OOZIE_SLA_SEMANTIC_XSD[] = { "gms-oozie-sla-0.1.xsd" };
View
3  core/src/main/java/org/apache/oozie/store/WorkflowStore.java
@@ -940,6 +940,9 @@ private WorkflowActionBean getBeanForRunningAction(WorkflowActionBean a) throws
action.setStartTime(a.getStartTime());
action.setStatus(a.getStatus());
action.setJobId(a.getWfId());
+ action.setUserRetryCount(a.getUserRetryCount());
+ action.setUserRetryInterval(a.getUserRetryInterval());
+ action.setUserRetryMax(a.getUserRetryMax());
return action;
}
return null;
View
15 core/src/main/java/org/apache/oozie/workflow/lite/ActionNodeDef.java
@@ -19,7 +19,11 @@
import java.util.Arrays;
-//TODO javadoc
+/**
+ * Node definition for workflow action. This node definition is serialized object and should provide
+ * readFields() and write() for read and write of fields in this class.
+ *
+ */
public class ActionNodeDef extends NodeDef {
ActionNodeDef() {
@@ -31,7 +35,12 @@ public ActionNodeDef(String name, String conf, Class<? extends ActionNodeHandler
}
public ActionNodeDef(String name, String conf, Class<? extends ActionNodeHandler> actionHandlerClass, String onOk,
- String onError,String cred) {
- super(name, ParamChecker.notNull(conf, "conf"), actionHandlerClass, Arrays.asList(onOk, onError),cred);
+ String onError, String cred) {
+ super(name, ParamChecker.notNull(conf, "conf"), actionHandlerClass, Arrays.asList(onOk, onError), cred);
+ }
+
+ public ActionNodeDef(String name, String conf, Class<? extends ActionNodeHandler> actionHandlerClass, String onOk,
+ String onError, String cred, String userRetryMax, String userRetryInterval) {
+ super(name, ParamChecker.notNull(conf, "conf"), actionHandlerClass, Arrays.asList(onOk, onError), cred, userRetryMax, userRetryInterval);
}
}
View
10 core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowAppParser.java
@@ -56,6 +56,8 @@
private static final String NAME_A = "name";
private static final String CRED_A = "cred";
+ private static final String USER_RETRY_MAX_A = "retry-max";
+ private static final String USER_RETRY_INTERVAL_A = "retry-interval";
private static final String TO_A = "to";
private static final String FORK_PATH_E = "path";
@@ -196,9 +198,15 @@ private LiteWorkflowApp parse(String strDef, Element root) throws WorkflowExcept
}
}
}
+
+ String credStr = eNode.getAttributeValue(CRED_A);
+ String userRetryMaxStr = eNode.getAttributeValue(USER_RETRY_MAX_A);
+ String userRetryIntervalStr = eNode.getAttributeValue(USER_RETRY_INTERVAL_A);
+
String actionConf = XmlUtils.prettyPrint(eActionConf).toString();
def.addNode(new ActionNodeDef(eNode.getAttributeValue(NAME_A), actionConf, actionHandlerClass,
- transitions[0], transitions[1], eNode.getAttributeValue(CRED_A)));
+ transitions[0], transitions[1], credStr,
+ userRetryMaxStr, userRetryIntervalStr));
}
else {
if (SLA_INFO.equals(eNode.getName()) || CREDENTIALS.equals(eNode.getName())) {
View
174 core/src/main/java/org/apache/oozie/workflow/lite/NodeDef.java
@@ -15,8 +15,9 @@
package org.apache.oozie.workflow.lite;
import org.apache.hadoop.io.Writable;
+import org.apache.oozie.service.LiteWorkflowStoreService;
import org.apache.oozie.util.ParamChecker;
-import org.apache.oozie.util.XLog;
+import org.apache.oozie.workflow.WorkflowException;
import java.io.DataInput;
import java.io.DataOutput;
@@ -25,13 +26,19 @@
import java.util.Collections;
import java.util.List;
-//TODO javadoc
+/**
+ * This node definition is serialized object and should provide readFields() and write() for read and write of fields in
+ * this class.
+ */
public class NodeDef implements Writable {
- private String name;
+ private String nodeDefVersion = null;
+ private String name = null;
private Class<? extends NodeHandler> handlerClass;
- private String conf;
+ private String conf = null;
private List<String> transitions = new ArrayList<String>();
- private String cred;
+ private String cred = "null";
+ private String userRetryMax = "null";
+ private String userRetryInterval = "null";
NodeDef() {
}
@@ -41,26 +48,31 @@
this.conf = conf;
this.handlerClass = ParamChecker.notNull(handlerClass, "handlerClass");
this.transitions = Collections.unmodifiableList(ParamChecker.notEmptyElements(transitions, "transitions"));
- this.cred = "null";
}
- NodeDef(String name, String conf, Class<? extends NodeHandler> handlerClass, List<String> transitions,String cred) {
- this.name = ParamChecker.notEmpty(name, "name");
- this.conf = conf;
- this.handlerClass = ParamChecker.notNull(handlerClass, "handlerClass");
- this.transitions = Collections.unmodifiableList(ParamChecker.notEmptyElements(transitions, "transitions"));
- if(cred != null){
+ NodeDef(String name, String conf, Class<? extends NodeHandler> handlerClass, List<String> transitions, String cred) {
+ this(name, conf, handlerClass, transitions);
+ if (cred != null) {
this.cred = cred;
}
- else{
- this.cred = "null";
+ }
+
+ NodeDef(String name, String conf, Class<? extends NodeHandler> handlerClass, List<String> transitions, String cred,
+ String userRetryMax, String userRetryInterval) {
+ this(name, conf, handlerClass, transitions, cred);
+ if (userRetryMax != null) {
+ this.userRetryMax = userRetryMax;
+ }
+ if (userRetryInterval != null) {
+ this.userRetryInterval = userRetryInterval;
}
}
-
+
public boolean equals(NodeDef other) {
return !(other == null || getClass() != other.getClass() || !getName().equals(other.getName()));
}
+ @Override
public int hashCode() {
return name.hashCode();
}
@@ -69,9 +81,6 @@ public String getName() {
return name;
}
- /**
- * @return the auth
- */
public String getCred() {
return cred;
}
@@ -88,9 +97,57 @@ public String getConf() {
return conf;
}
- @Override
+ public String getUserRetryMax() {
+ return userRetryMax;
+ }
+
+ public String getUserRetryInterval() {
+ return userRetryInterval;
+ }
+
+ public String getNodeDefVersion() {
+ if (nodeDefVersion == null) {
+ try {
+ nodeDefVersion = LiteWorkflowStoreService.getNodeDefDefaultVersion();
+ }
+ catch (WorkflowException e) {
+ nodeDefVersion = LiteWorkflowStoreService.NODE_DEF_VERSION_1;
+ }
+ }
+ return nodeDefVersion;
+ }
+
@SuppressWarnings("unchecked")
- public void readFields(DataInput dataInput) throws IOException {
+ private void readVersionZero(DataInput dataInput, String firstField) throws IOException {
+ if (firstField.equals(LiteWorkflowStoreService.NODE_DEF_VERSION_0)) {
+ name = dataInput.readUTF();
+ } else {
+ name = firstField;
+ }
+ nodeDefVersion = LiteWorkflowStoreService.NODE_DEF_VERSION_0;
+ cred = dataInput.readUTF();
+ String handlerClassName = dataInput.readUTF();
+ if ((handlerClassName != null) && (handlerClassName.length() > 0)) {
+ try {
+ handlerClass = (Class<? extends NodeHandler>) Class.forName(handlerClassName);
+ }
+ catch (ClassNotFoundException ex) {
+ throw new IOException(ex);
+ }
+ }
+ conf = dataInput.readUTF();
+ if (conf.equals("null")) {
+ conf = null;
+ }
+ int numTrans = dataInput.readInt();
+ transitions = new ArrayList<String>(numTrans);
+ for (int i = 0; i < numTrans; i++) {
+ transitions.add(dataInput.readUTF());
+ }
+ }
+ @SuppressWarnings("unchecked")
+ private void readVersionOne(DataInput dataInput, String firstField) throws IOException {
+ nodeDefVersion = LiteWorkflowStoreService.NODE_DEF_VERSION_1;
name = dataInput.readUTF();
cred = dataInput.readUTF();
String handlerClassName = dataInput.readUTF();
@@ -111,17 +168,33 @@ public void readFields(DataInput dataInput) throws IOException {
for (int i = 0; i < numTrans; i++) {
transitions.add(dataInput.readUTF());
}
+ userRetryMax = dataInput.readUTF();
+ userRetryInterval = dataInput.readUTF();
}
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.io.Writable#readFields(java.io.DataInput)
+ */
@Override
- public void write(DataOutput dataOutput) throws IOException {
+ public void readFields(DataInput dataInput) throws IOException {
+ String firstField = dataInput.readUTF();
+ if (!firstField.equals(LiteWorkflowStoreService.NODE_DEF_VERSION_1)) {
+ readVersionZero(dataInput, firstField);
+ } else {
+ //since oozie version 3.1
+ readVersionOne(dataInput, firstField);
+ }
+ }
+
+ private void writeVersionZero(DataOutput dataOutput) throws IOException {
+ dataOutput.writeUTF(nodeDefVersion);
dataOutput.writeUTF(name);
- if(cred != null){
+ if (cred != null) {
dataOutput.writeUTF(cred);
- }else{
+ }
+ else {
dataOutput.writeUTF("null");
}
- XLog.getLog(getClass()).debug("write: Name:" + name +" Cred: "+ cred);
dataOutput.writeUTF(handlerClass.getName());
if (conf != null) {
dataOutput.writeUTF(conf);
@@ -135,4 +208,57 @@ public void write(DataOutput dataOutput) throws IOException {
}
}
+ /**
+ * Write as version one format, this version was since 3.1.
+ *
+ * @param dataOutput data output to serialize node def
+ * @throws IOException thrown if fail to write
+ */
+ private void writeVersionOne(DataOutput dataOutput) throws IOException {
+ dataOutput.writeUTF(nodeDefVersion);
+ dataOutput.writeUTF(name);
+ if (cred != null) {
+ dataOutput.writeUTF(cred);
+ }
+ else {
+ dataOutput.writeUTF("null");
+ }
+ dataOutput.writeUTF(handlerClass.getName());
+ if (conf != null) {
+ dataOutput.writeUTF(conf);
+ }
+ else {
+ dataOutput.writeUTF("null");
+ }
+ dataOutput.writeInt(transitions.size());
+ for (String transition : transitions) {
+ dataOutput.writeUTF(transition);
+ }
+ if (userRetryMax != null) {
+ dataOutput.writeUTF(userRetryMax);
+ }
+ else {
+ dataOutput.writeUTF("null");
+ }
+ if (userRetryInterval != null) {
+ dataOutput.writeUTF(userRetryInterval);
+ }
+ else {
+ dataOutput.writeUTF("null");
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.io.Writable#write(java.io.DataOutput)
+ */
+ @Override
+ public void write(DataOutput dataOutput) throws IOException {
+ if (!getNodeDefVersion().equals(LiteWorkflowStoreService.NODE_DEF_VERSION_1)) {
+ writeVersionZero(dataOutput);
+ } else {
+ //since oozie version 3.1
+ writeVersionOne(dataOutput);
+ }
+ }
+
}
View
54 core/src/main/resources/oozie-default.xml
@@ -1029,7 +1029,7 @@
<!-- SchemaService -->
- <property>
+ <property>
<name>oozie.service.SchemaService.wf.ext.schemas</name>
<value>oozie-sla-0.1.xsd</value>
<description>
@@ -1039,6 +1039,7 @@
if empty Configuration assumes it is NULL.
</description>
</property>
+
<property>
<name>oozie.service.SchemaService.coord.ext.schemas</name>
<value>oozie-sla-0.1.xsd</value>
@@ -1049,6 +1050,7 @@
if empty Configuration assumes it is NULL.
</description>
</property>
+
<property>
<name>oozie.service.SchemaService.sla.ext.schemas</name>
<value> </value>
@@ -1331,5 +1333,55 @@
Default timeout (in milliseconds) for commands for acquiring an exclusive lock on an entity.
</description>
</property>
+
+ <!-- LiteWorkflowStoreService, Workflow Action Automatic Retry -->
+
+ <property>
+ <name>oozie.service.LiteWorkflowStoreService.user.retry.max</name>
+ <value>3</value>
+ <description>
+ Automatic retry max count for workflow action is 3 in default.
+ </description>
+ </property>
+
+ <property>
+ <name>oozie.service.LiteWorkflowStoreService.user.retry.inteval</name>
+ <value>10</value>
+ <description>
+ Automatic retry interval for workflow action is in minutes and the default value is 10 minutes.
+ </description>
+ </property>
+
+ <property>
+ <name>oozie.service.LiteWorkflowStoreService.user.retry.error.code</name>
+ <value>
+ JA017,
+ JA018,
+ FS009,
+ FS008
+ </value>
+ <description>
+ Automatic retry interval for workflow action is handled for these specified error code:
+ FS009, FS008 is file exists error when using chmod in fs action.
+ JA018 is output directory exists error in workflow map-reduce action.
+ JA017 is job not exists error in action executor.
+ </description>
+ </property>
+
+ <property>
+ <name>oozie.service.LiteWorkflowStoreService.user.retry.error.code.ext</name>
+ <value> </value>
+ <description>
+ Automatic retry interval for workflow action is handled for these specified extra error code.
+ </description>
+ </property>
+
+ <property>
+ <name>oozie.service.LiteWorkflowStoreService.node.def.version</name>
+ <value>_oozie_inst_v_1</value>
+ <description>
+ NodeDef default version, _oozie_inst_v_0 or _oozie_inst_v_1
+ </description>
+ </property>
</configuration>
View
2  core/src/test/java/org/apache/oozie/command/wf/TestActionCheckXCommand.java
@@ -194,7 +194,7 @@ public void testActionCheck() throws Exception {
new ActionStartXCommand(action.getId(), "map-reduce").call();
action = jpaService.execute(wfActionGetCmd);
- ActionExecutorContext context = new ActionXCommand.ActionExecutorContext(job, action, false);
+ ActionExecutorContext context = new ActionXCommand.ActionExecutorContext(job, action, false, false);
MapReduceActionExecutor actionExecutor = new MapReduceActionExecutor();
Configuration conf = actionExecutor.createBaseHadoopConf(context, XmlUtils.parseXml(action.getConf()));
String user = conf.get("user.name");
View
69 core/src/test/java/org/apache/oozie/command/wf/TestActionErrors.java
@@ -38,6 +38,7 @@
import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
import org.apache.oozie.service.ActionService;
import org.apache.oozie.service.JPAService;
+import org.apache.oozie.service.LiteWorkflowStoreService;
import org.apache.oozie.service.SchemaService;
import org.apache.oozie.service.Services;
import org.apache.oozie.service.WorkflowStoreService;
@@ -58,6 +59,7 @@
protected void setUp() throws Exception {
super.setUp();
setSystemProperty(SchemaService.WF_CONF_EXT_SCHEMAS, "wf-ext-schema.xsd");
+ setSystemProperty(LiteWorkflowStoreService.CONF_USER_RETRY_ERROR_CODE_EXT, ForTestingActionExecutor.TEST_ERROR);
services = new Services();
services.init();
cleanUpDBTables();
@@ -191,7 +193,29 @@ public void testEndError() throws Exception {
_testError("end.error", "ok", "OK");
assertTrue(true);
}
-
+
+ /**
+ * Tests for correct functionality when a {@link org.apache.oozie.action.ActionExecutorException.ErrorType#ERROR} is
+ * generated when executing start. </p> Checks for user retry is applied to actions for specified retry-max=2.
+ *
+ * @throws Exception
+ */
+ public void testStartErrorWithUserRetry() throws Exception {
+ _testErrorWithUserRetry("start.error", "error", "based_on_action_status");
+ assertTrue(true);
+ }
+
+ /**
+ * Tests for correct functionality when a {@link org.apache.oozie.action.ActionExecutorException.ErrorType#ERROR} is
+ * generated when executing end. </p> Checks for user retry is applied to actions for specified retry-max=2.
+ *
+ * @throws Exception
+ */
+ public void testEndErrorWithUserRetry() throws Exception {
+ _testErrorWithUserRetry("end.error", "ok", "OK");
+ assertTrue(true);
+ }
+
/**
* Tests for the job to be KILLED and status set to FAILED in case an Action Handler does not call setExecutionData
* in it's start() implementation.
@@ -512,6 +536,49 @@ public boolean evaluate() throws Exception {
store.commitTrx();
store.closeTrx();
}
+
+ /**
+ * Provides functionality to test user retry
+ *
+ * @param errorType the error type. (start.non-transient, end.non-transient)
+ * @param externalStatus the external status to set.
+ * @param signalValue the signal value to set.
+ * @throws Exception
+ */
+ private void _testErrorWithUserRetry(String errorType, String externalStatus, String signalValue) throws Exception {
+ Reader reader = IOUtils.getResourceAsReader("wf-ext-schema-valid-user-retry.xml", -1);
+ Writer writer = new FileWriter(getTestCaseDir() + "/workflow.xml");
+ IOUtils.copyCharStream(reader, writer);
+
+ final DagEngine engine = new DagEngine("u", "a");
+ Configuration conf = new XConfiguration();
+ conf.set(OozieClient.APP_PATH, getTestCaseDir() + File.separator + "workflow.xml");
+ conf.set(OozieClient.USER_NAME, getTestUser());
+ conf.set(OozieClient.GROUP_NAME, getTestGroup());
+ injectKerberosInfo(conf);
+ conf.set(OozieClient.LOG_TOKEN, "t");
+ conf.set("error", errorType);
+ conf.set("external-status", externalStatus);
+ conf.set("signal-value", signalValue);
+
+ final String jobId = engine.submitJob(conf, true);
+
+ final JPAService jpaService = Services.get().get(JPAService.class);
+ final WorkflowJobGetJPAExecutor wfJobGetCmd = new WorkflowJobGetJPAExecutor(jobId);
+
+ final WorkflowActionsGetForJobJPAExecutor actionsGetExecutor = new WorkflowActionsGetForJobJPAExecutor(jobId);
+ waitFor(5000, new Predicate() {
+ public boolean evaluate() throws Exception {
+ List<WorkflowActionBean> actions = jpaService.execute(actionsGetExecutor);
+ WorkflowActionBean action = actions.get(0);
+ return (action.getUserRetryCount() == 2);
+ }
+ });
+
+ List<WorkflowActionBean> actions = jpaService.execute(actionsGetExecutor);
+ WorkflowActionBean action = actions.get(0);
+ assertEquals(2, action.getUserRetryCount());
+ }
/**
* Provides functionality to test for set*Data calls not being made by the Action Handler.
View
2  core/src/test/java/org/apache/oozie/command/wf/TestActionStartXCommand.java
@@ -150,7 +150,7 @@ public void testActionStart() throws Exception {
action = jpaService.execute(wfActionGetCmd);
assertNotNull(action.getExternalId());
- ActionExecutorContext context = new ActionXCommand.ActionExecutorContext(job, action, false);
+ ActionExecutorContext context = new ActionXCommand.ActionExecutorContext(job, action, false, false);
MapReduceActionExecutor actionExecutor = new MapReduceActionExecutor();
Configuration conf = actionExecutor.createBaseHadoopConf(context, XmlUtils.parseXml(action.getConf()));
String user = conf.get("user.name");
View
76 core/src/test/java/org/apache/oozie/command/wf/TestWorkflowKillXCommand.java
@@ -24,7 +24,10 @@
import org.apache.oozie.executor.jpa.WorkflowActionGetJPAExecutor;
import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
import org.apache.oozie.service.JPAService;
+import org.apache.oozie.service.LiteWorkflowStoreService;
import org.apache.oozie.service.Services;
+import org.apache.oozie.service.UUIDService;
+import org.apache.oozie.service.UUIDService.ApplicationType;
import org.apache.oozie.test.XDataTestCase;
import org.apache.oozie.workflow.WorkflowInstance;
@@ -75,7 +78,7 @@ public void testWfKillSuccess1() throws Exception {
wfInstance = job.getWorkflowInstance();
assertEquals(wfInstance.getStatus(), WorkflowInstance.Status.KILLED);
}
-
+
/**
* Test : kill RUNNING job and RUNNING action successfully.
*
@@ -108,6 +111,77 @@ public void testWfKillSuccess2() throws Exception {
}
/**
+ * Test : kill RUNNING job after NodeDef upgrade.
+ *
+ * @throws Exception
+ */
+ public void testWfKillSuccessAfterNodeDefUpgrade() throws Exception {
+ services.destroy();
+
+ setSystemProperty(LiteWorkflowStoreService.CONF_NODE_DEF_VERSION, LiteWorkflowStoreService.NODE_DEF_VERSION_0);
+ services = new Services();
+ services.init();
+
+ WorkflowJobBean job = this.addRecordToWfJobTable(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING);
+ WorkflowActionBean action = this.addRecordToWfActionTable(job.getId(), "1", WorkflowAction.Status.PREP);
+
+ JPAService jpaService = Services.get().get(JPAService.class);
+ assertNotNull(jpaService);
+ WorkflowJobGetJPAExecutor wfJobGetCmd = new WorkflowJobGetJPAExecutor(job.getId());
+ WorkflowActionGetJPAExecutor wfActionGetCmd = new WorkflowActionGetJPAExecutor(action.getId());
+
+ job = jpaService.execute(wfJobGetCmd);
+ action = jpaService.execute(wfActionGetCmd);
+ assertEquals(job.getStatus(), WorkflowJob.Status.RUNNING);
+ assertEquals(action.getStatus(), WorkflowAction.Status.PREP);
+ WorkflowInstance wfInstance = job.getWorkflowInstance();
+ assertEquals(wfInstance.getStatus(), WorkflowInstance.Status.RUNNING);
+
+ services.destroy();
+
+ Thread.sleep(5000);
+
+ setSystemProperty(LiteWorkflowStoreService.CONF_NODE_DEF_VERSION, LiteWorkflowStoreService.NODE_DEF_VERSION_1);
+ services = new Services();
+ services.init();
+
+ Thread.sleep(5000);
+
+ new KillXCommand(job.getId()).call();
+
+ jpaService = Services.get().get(JPAService.class);
+ job = jpaService.execute(wfJobGetCmd);
+ action = jpaService.execute(wfActionGetCmd);
+ assertEquals(job.getStatus(), WorkflowJob.Status.KILLED);
+ assertEquals(action.getStatus(), WorkflowAction.Status.KILLED);
+ wfInstance = job.getWorkflowInstance();
+ assertEquals(wfInstance.getStatus(), WorkflowInstance.Status.KILLED);
+ }
+
+
+ public void testChildId() throws Exception {
+ setSystemProperty(UUIDService.CONF_GENERATOR, "counter");
+ Services services = new Services();
+ services.init();
+ UUIDService uuid = services.get(UUIDService.class);
+ String id = uuid.generateId(ApplicationType.WORKFLOW);
+ String childId = uuid.generateChildId(id, "a");
+ assertEquals(id, uuid.getId(childId));
+ assertEquals("a", uuid.getChildName(childId));
+ services.destroy();
+
+ setSystemProperty(UUIDService.CONF_GENERATOR, "random");
+ services = new Services();
+ services.init();
+ uuid = services.get(UUIDService.class);
+ id = uuid.generateId(ApplicationType.WORKFLOW);
+ childId = uuid.generateChildId(id, "a");
+ assertEquals(id, uuid.getId(childId));
+ assertEquals("a", uuid.getChildName(childId));
+ services.destroy();
+ }
+
+ /**
* Test : kill job but failed to kill an already successful action.
*
* @throws Exception
View
125 core/src/test/java/org/apache/oozie/service/TestRecoveryService.java
@@ -17,6 +17,7 @@
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
+import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.io.Reader;
import java.io.StringReader;
@@ -24,7 +25,12 @@
import java.util.Date;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobID;
+import org.apache.hadoop.mapred.RunningJob;
import org.apache.oozie.CoordinatorActionBean;
import org.apache.oozie.CoordinatorEngine;
import org.apache.oozie.CoordinatorJobBean;
@@ -32,25 +38,36 @@
import org.apache.oozie.ForTestingActionExecutor;
import org.apache.oozie.WorkflowActionBean;
import org.apache.oozie.WorkflowJobBean;
+import org.apache.oozie.action.hadoop.LauncherMapper;
+import org.apache.oozie.action.hadoop.MapReduceActionExecutor;
+import org.apache.oozie.action.hadoop.MapperReducerForTest;
import org.apache.oozie.client.CoordinatorAction;
import org.apache.oozie.client.CoordinatorJob;
import org.apache.oozie.client.OozieClient;
+import org.apache.oozie.client.WorkflowAction;
import org.apache.oozie.client.WorkflowJob;
import org.apache.oozie.client.CoordinatorJob.Execution;
+import org.apache.oozie.command.wf.ActionStartXCommand;
+import org.apache.oozie.command.wf.ActionXCommand;
+import org.apache.oozie.command.wf.ActionXCommand.ActionExecutorContext;
import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor;
import org.apache.oozie.executor.jpa.CoordActionInsertJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobInsertJPAExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.executor.jpa.WorkflowActionGetJPAExecutor;
+import org.apache.oozie.executor.jpa.WorkflowActionInsertJPAExecutor;
import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
import org.apache.oozie.service.RecoveryService.RecoveryRunnable;
import org.apache.oozie.store.CoordinatorStore;
import org.apache.oozie.store.StoreException;
import org.apache.oozie.store.WorkflowStore;
import org.apache.oozie.test.XDataTestCase;
+import org.apache.oozie.test.XTestCase.Predicate;
import org.apache.oozie.util.DateUtils;
import org.apache.oozie.util.IOUtils;
import org.apache.oozie.util.XConfiguration;
import org.apache.oozie.util.XLog;
+import org.apache.oozie.util.XmlUtils;
import org.apache.oozie.workflow.WorkflowInstance;
public class TestRecoveryService extends XDataTestCase {
@@ -162,6 +179,55 @@ public boolean evaluate() throws Exception {
store3.commitTrx();
store3.closeTrx();
}
+
+ /**
+ * Tests functionality of the Recovery Service Runnable command. </p> Starts an action with USER_RETRY status.
+ * Runs the recovery runnable, and ensures the state changes to OK and the job completes successfully.
+ *
+ * @throws Exception
+ */
+ public void testWorkflowActionRecoveryUserRetry() throws Exception {
+ final JPAService jpaService = Services.get().get(JPAService.class);
+ WorkflowJobBean job = this.addRecordToWfJobTable(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING);
+ WorkflowActionBean action = this.addRecordToWfActionTable(job.getId(), "1", WorkflowAction.Status.USER_RETRY);
+
+ Runnable recoveryRunnable = new RecoveryRunnable(0, 60, 60);
+ recoveryRunnable.run();
+ Thread.sleep(3000);
+
+ final WorkflowActionGetJPAExecutor wfActionGetCmd = new WorkflowActionGetJPAExecutor(action.getId());
+
+ waitFor(5000, new Predicate() {
+ public boolean evaluate() throws Exception {
+ WorkflowActionBean a = jpaService.execute(wfActionGetCmd);
+ return a.getExternalId() != null;
+ }
+ });
+ action = jpaService.execute(wfActionGetCmd);
+ assertNotNull(action.getExternalId());
+ assertEquals(WorkflowAction.Status.RUNNING, action.getStatus());
+
+ ActionExecutorContext context = new ActionXCommand.ActionExecutorContext(job, action, false, false);
+ MapReduceActionExecutor actionExecutor = new MapReduceActionExecutor();
+ Configuration conf = actionExecutor.createBaseHadoopConf(context, XmlUtils.parseXml(action.getConf()));
+ String user = conf.get("user.name");
+ String group = conf.get("group.name");
+ JobClient jobClient = Services.get().get(HadoopAccessorService.class).createJobClient(user, group,
+ new JobConf(conf));
+
+ String launcherId = action.getExternalId();
+
+ final RunningJob launcherJob = jobClient.getJob(JobID.forName(launcherId));
+
+ waitFor(120 * 1000, new Predicate() {
+ public boolean evaluate() throws Exception {
+ return launcherJob.isComplete();
+ }
+ });
+ assertTrue(launcherJob.isSuccessful());
+ assertTrue(LauncherMapper.hasIdSwap(launcherJob));
+ }
+
/**
* Tests functionality of the Recovery Service Runnable command. </p> Insert a coordinator job with RUNNING and
@@ -628,4 +694,63 @@ private void addRecordToJobTable(String jobId, CoordinatorStore store, String ba
throw se;
}
}
+
+ @Override
+ protected WorkflowActionBean addRecordToWfActionTable(String wfId, String actionName, WorkflowAction.Status status)
+ throws Exception {
+ WorkflowActionBean action = createWorkflowActionSetPending(wfId, status);
+ try {
+ JPAService jpaService = Services.get().get(JPAService.class);
+ assertNotNull(jpaService);
+ WorkflowActionInsertJPAExecutor actionInsertCmd = new WorkflowActionInsertJPAExecutor(action);
+ jpaService.execute(actionInsertCmd);
+ }
+ catch (JPAExecutorException ce) {
+ ce.printStackTrace();
+ fail("Unable to insert the test wf action record to table");
+ throw ce;
+ }
+ return action;
+ }
+
+ protected WorkflowActionBean createWorkflowActionSetPending(String wfId, WorkflowAction.Status status)
+ throws Exception {
+ WorkflowActionBean action = new WorkflowActionBean();
+ String actionname = "testAction";
+ action.setName(actionname);
+ action.setCred("null");
+ action.setId(Services.get().get(UUIDService.class).generateChildId(wfId, actionname));
+ action.setJobId(wfId);
+ action.setType("map-reduce");
+ action.setTransition("transition");
+ action.setStatus(status);
+ action.setStartTime(new Date());
+ action.setEndTime(new Date());
+ action.setLastCheckTime(new Date());
+ action.setPending();
+ action.setUserRetryCount(1);
+ action.setUserRetryMax(2);
+ action.setUserRetryInterval(1);
+
+ Path inputDir = new Path(getFsTestCaseDir(), "input");
+ Path outputDir = new Path(getFsTestCaseDir(), "output");
+
+ FileSystem fs = getFileSystem();
+ Writer w = new OutputStreamWriter(fs.create(new Path(inputDir, "data.txt")));
+ w.write("dummy\n");
+ w.write("dummy\n");
+ w.close();
+
+ String actionXml = "<map-reduce>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + "<name-node>"
+ + getNameNodeUri() + "</name-node>" + "<configuration>"
+ + "<property><name>mapred.mapper.class</name><value>" + MapperReducerForTest.class.getName()
+ + "</value></property>" + "<property><name>mapred.reducer.class</name><value>"
+ + MapperReducerForTest.class.getName() + "</value></property>"
+ + "<property><name>mapred.input.dir</name><value>" + inputDir.toString() + "</value></property>"
+ + "<property><name>mapred.output.dir</name><value>" + outputDir.toString() + "</value></property>"
+ + "</configuration>" + "</map-reduce>";
+ action.setConf(actionXml);
+
+ return action;
+ }
}
View
38 core/src/test/resources/wf-ext-schema-valid-user-retry.xml
@@ -0,0 +1,38 @@
+<!--
+ Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License. See accompanying LICENSE file.
+-->
+<workflow-app xmlns="uri:oozie:workflow:0.3" name="test-wf">
+
+ <start to="a"/>
+
+ <action name="a" retry-max="2" retry-interval="0">
+ <test xmlns="uri:test">
+ <signal-value>${wf:conf('signal-value')}</signal-value>
+ <external-status>${wf:conf('external-status')}</external-status>
+ <error>${wf:conf('error')}</error>
+ <avoid-set-execution-data>${wf:conf('avoid-set-execution-data')}</avoid-set-execution-data>
+ <avoid-set-end-data>${wf:conf('avoid-set-end-data')}</avoid-set-end-data>
+ <running-mode>${wf:conf('running-mode')}</running-mode>
+ </test>
+ <ok to="end"/>
+ <error to="kill"/>
+ </action>
+
+ <kill name="kill">
+ <message>kill</message>
+ </kill>
+
+ <end name="end"/>
+
+</workflow-app>
View
574 docs/src/site/twiki/WorkflowFunctionalSpec.twiki
@@ -5,12 +5,11 @@
-----
---+!! Oozie Specification, a Hadoop Workflow System
-*<center>(PROPOSAL v1.0-2009MAY21)</center>*
+*<center>(v3.1)</center>*
The goal of this document is to define a workflow engine system specialized in coordinating the execution of Hadoop
Map/Reduce and Pig jobs.
-Author: Alejandro Abdelnur
%TOC%
@@ -21,6 +20,7 @@ Author: Alejandro Abdelnur
---+++!! 2011AUG12
* #3.2.4 fs 'move' action characteristics updated, to allow for consistent source and target paths and existing target path only if directory
+ * #18, Update the doc for user-retry of workflow action.
---+++!! 2011FEB19
* #10, Update the doc to rerun from the failed node.
@@ -2276,11 +2276,581 @@ A workflow job can specify a share library path using the job property =oozie.li
A workflow job can use the system share library by setting the job property =oozie.use.system.libpath= to =true=.
+---++ 18 User-Retry for Workflow Actions (since Oozie 3.1)
+
+Oozie provides User-Retry capabilities when an action is in =ERROR= or =FAILED= state.
+
+Depending on the nature of the failure, Oozie can define what type of errors allowed for User-Retry. There are certain errors
+Oozie is allowing for user-retry in default, for example, file-exists-error =FS009, FS008= when using chmod in workflow =fs=
+action, output-directory-exists-error =JA018= in workflow =map-reduce= action, and job-not-exists-error =JA017= in action executor.
+
+User-Retry allows user to give certain number of reties (must not exceed system max retries), so user can find the causes of
+that problem and fix them when action is in =USER_RETRY= state. If failure or error does not go away after max retries,
+the action becomes =FAILED= or =ERROR= and Oozie marks workflow job to =FAILED= or =KILLED=.
+
+Oozie adminstrator can allow more error codes to be handled for User-Retry. By adding this configuration
+=oozie.service.LiteWorkflowStoreService.user.retry.error.code.ext= to =oozie.site.xml=
+and error codes as value, these error codes will be considered as User-Retry after system restart.
+
+Examples of User-Retry in a workflow aciton is :
+
+<verbatim>
+<workflow-app xmlns="uri:oozie:workflow:0.3" name="wf-name">
+<action name="a" retry-max="2" retry-interval="1">
+</action>
+</verbatim>
+
---++ Appendixes
#OozieWFSchema
---+++ Appendix A, Oozie XML-Schema
+---++++ Oozie Schema Version 0.3
+<verbatim>
+<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema" xmlns:workflow="uri:oozie:workflow:0.3"
+ elementFormDefault="qualified" targetNamespace="uri:oozie:workflow:0.3">
+
+ <xs:element name="workflow-app" type="workflow:WORKFLOW-APP"/>
+
+ <xs:simpleType name="IDENTIFIER">
+ <xs:restriction base="xs:string">
+ <xs:pattern value="([a-zA-Z_]([\-_a-zA-Z0-9])*){1,39})"/>
+ </xs:restriction>
+ </xs:simpleType>
+
+ <xs:complexType name="WORKFLOW-APP">
+ <xs:sequence>
+ <xs:element name="credentials" type="workflow:CREDENTIALS" minOccurs="0" maxOccurs="1"/>
+ <xs:element name="start" type="workflow:START" minOccurs="1" maxOccurs="1"/>
+ <xs:choice minOccurs="0" maxOccurs="unbounded">
+ <xs:element name="decision" type="workflow:DECISION" minOccurs="1" maxOccurs="1"/>
+ <xs:element name="fork" type="workflow:FORK" minOccurs="1" maxOccurs="1"/>
+ <xs:element name="join" type="workflow:JOIN" minOccurs="1" maxOccurs="1"/>
+ <xs:element name="kill" type="workflow:KILL" minOccurs="1" maxOccurs="1"/>
+ <xs:element name="action" type="workflow:ACTION" minOccurs="1" maxOccurs="1"/>
+ </xs:choice>
+ <xs:element name="end" type="workflow:END" minOccurs="1" maxOccurs="1"/>
+ <xs:any namespace="uri:oozie:sla:0.1" minOccurs="0" maxOccurs="1"/>
+ </xs:sequence>
+ <xs:attribute name="name" type="workflow:IDENTIFIER" use="required"/>
+ </xs:complexType>
+
+ <xs:complexType name="START">
+ <xs:attribute name="to" type="workflow:IDENTIFIER" use="required"/>
+ </xs:complexType>
+
+ <xs:complexType name="END">
+ <xs:attribute name="name" type="workflow:IDENTIFIER" use="required"/>
+ </xs:complexType>
+
+ <xs:complexType name="DECISION">
+ <xs:sequence>
+ <xs:element name="switch" type="workflow:SWITCH" minOccurs="1" maxOccurs="1"/>
+ </xs:sequence>
+ <xs:attribute name="name" type="workflow:IDENTIFIER" use="required"/>
+ </xs:complexType>
+
+ <xs:element name="switch" type="workflow:SWITCH"/>
+
+ <xs:complexType name="SWITCH">
+ <xs:sequence>
+ <xs:sequence>
+ <xs:element name="case" type="workflow:CASE" minOccurs="1" maxOccurs="unbounded"/>
+ <xs:element name="default" type="workflow:DEFAULT" minOccurs="1" maxOccurs="1"/>
+ </xs:sequence>
+ </xs:sequence>
+ </xs:complexType>
+
+ <xs:complexType name="CASE">
+ <xs:simpleContent>
+ <xs:extension base="xs:string">
+ <xs:attribute name="to" type="workflow:IDENTIFIER" use="required"/>
+ </xs:extension>
+ </xs:simpleContent>
+ </xs:complexType>
+
+ <xs:complexType name="DEFAULT">
+ <xs:attribute name="to" type="workflow:IDENTIFIER" use="required"/>
+ </xs:complexType>
+
+ <xs:complexType name="FORK_TRANSITION">
+ <xs:attribute name="start" type="workflow:IDENTIFIER" use="required"/>
+ </xs:complexType>
+
+ <xs:complexType name="FORK">
+ <xs:sequence>
+ <xs:element name="path" type="workflow:FORK_TRANSITION" minOccurs="2" maxOccurs="unbounded"/>
+ </xs:sequence>
+ <xs:attribute name="name" type="workflow:IDENTIFIER" use="required"/>
+ </xs:complexType>
+
+ <