Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Closes OOZIE-11 Adding DistCp Action

  • Loading branch information...
commit 9b72156507eb29adf44abc076ce35d00edbcb412 1 parent 4396fce
@bansalmayank bansalmayank authored Mohammad Kamrul Islam committed
View
2  client/src/main/java/org/apache/oozie/cli/OozieCLI.java
@@ -1189,6 +1189,8 @@ private void validateCommand(CommandLine commandLine) throws OozieCLIException {
"oozie-workflow-0.1.xsd")));
sources.add(new StreamSource(Thread.currentThread().getContextClassLoader().getResourceAsStream(
"email-action-0.1.xsd")));
+ sources.add(new StreamSource(Thread.currentThread().getContextClassLoader().getResourceAsStream(
+ "distcp-action-0.1.xsd")));
SchemaFactory factory = SchemaFactory.newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI);
Schema schema = factory.newSchema(sources.toArray(new StreamSource[sources.size()]));
Validator validator = schema.newValidator();
View
62 client/src/main/resources/distcp-action-0.1.xsd
@@ -0,0 +1,62 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Copyright (c) 2011 Yahoo! Inc. All rights reserved.
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License. See accompanying LICENSE file.
+-->
+<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema"
+ xmlns:distcp="uri:oozie:distcp-action:0.1" elementFormDefault="qualified"
+ targetNamespace="uri:oozie:distcp-action:0.1">
+
+ <xs:element name="distcp" type="distcp:ACTION"/>
+
+ <xs:complexType name="ACTION">
+ <xs:sequence>
+ <xs:element name="job-tracker" type="xs:string" minOccurs="1" maxOccurs="1"/>
+ <xs:element name="name-node" type="xs:string" minOccurs="1" maxOccurs="1"/>
+ <xs:element name="prepare" type="distcp:PREPARE" minOccurs="0" maxOccurs="1"/>
+ <xs:element name="configuration" type="distcp:CONFIGURATION" minOccurs="0" maxOccurs="1"/>
+ <xs:element name="java-opts" type="xs:string" minOccurs="0" maxOccurs="1"/>
+ <xs:element name="arg" type="xs:string" minOccurs="0" maxOccurs="unbounded"/>
+ </xs:sequence>
+ </xs:complexType>
+
+ <xs:complexType name="CONFIGURATION">
+ <xs:sequence>
+ <xs:element name="property" minOccurs="1" maxOccurs="unbounded">
+ <xs:complexType>
+ <xs:sequence>
+ <xs:element name="name" minOccurs="1" maxOccurs="1" type="xs:string"/>
+ <xs:element name="value" minOccurs="1" maxOccurs="1" type="xs:string"/>
+ <xs:element name="description" minOccurs="0" maxOccurs="1" type="xs:string"/>
+ </xs:sequence>
+ </xs:complexType>
+ </xs:element>
+ </xs:sequence>
+ </xs:complexType>
+
+ <xs:complexType name="PREPARE">
+ <xs:sequence>
+ <xs:element name="delete" type="distcp:DELETE" minOccurs="0" maxOccurs="unbounded"/>
+ <xs:element name="mkdir" type="distcp:MKDIR" minOccurs="0" maxOccurs="unbounded"/>
+ </xs:sequence>
+ </xs:complexType>
+
+ <xs:complexType name="DELETE">
+ <xs:attribute name="path" type="xs:string" use="required"/>
+ </xs:complexType>
+
+ <xs:complexType name="MKDIR">
+ <xs:attribute name="path" type="xs:string" use="required"/>
+ </xs:complexType>
+
+</xs:schema>
View
85 core/src/main/java/org/apache/oozie/action/hadoop/DistcpActionExecutor.java
@@ -0,0 +1,85 @@
+/**
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. See accompanying LICENSE file.
+ */
+
+package org.apache.oozie.action.hadoop;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.util.XLog;
+import org.jdom.Element;
+
+
+public class DistcpActionExecutor extends JavaActionExecutor{
+ public static final String CONF_OOZIE_DISTCP_ACTION_MAIN_CLASS = "org.apache.hadoop.tools.DistCp";
+ public static final String CLASS_NAMES = "oozie.actions.main.classnames";
+ private static final XLog LOG = XLog.getLog(DistcpActionExecutor.class);
+ public static final String DISTCP_TYPE = "distcp";
+
+ public DistcpActionExecutor() {
+ super("distcp");
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.oozie.action.hadoop.JavaActionExecutor#getLauncherMain(org.apache.hadoop.conf.Configuration, org.jdom.Element)
+ */
+ @Override
+ protected String getLauncherMain(Configuration launcherConf, Element actionXml) {
+ String classNameDistcp = CONF_OOZIE_DISTCP_ACTION_MAIN_CLASS;
+ String name = getClassNamebyType(DISTCP_TYPE);
+ if(name != null){
+ classNameDistcp = name;
+ }
+ return launcherConf.get(LauncherMapper.CONF_OOZIE_ACTION_MAIN_CLASS, classNameDistcp);
+ }
+
+ /**
+ * This function returns the Action classes names from the configuration
+ *
+ * @param type This is type of the action classes
+ * @return Name of the class from the configuration
+ */
+ public static String getClassNamebyType(String type){
+ Configuration conf = Services.get().getConf();
+ String classname = null;
+ if (conf.get(CLASS_NAMES, "").trim().length() > 0) {
+ for (String function : conf.getStrings(CLASS_NAMES)) {
+ function = DistcpActionExecutor.Trim(function);
+ LOG.debug("class for Distcp Action: " + function);
+ String[] str = function.split("=");
+ if (str.length > 0) {
+ if(type.equalsIgnoreCase(str[0])){
+ classname = new String(str[1]);
+ }
+ }
+ }
+ }
+ return classname;
+ }
+
+ /**
+ * To trim string
+ *
+ * @param str
+ * @return trim string
+ */
+ public static String Trim(String str) {
+ if (str != null) {
+ str = str.replaceAll("\\n", "");
+ str = str.replaceAll("\\t", "");
+ str = str.trim();
+ }
+ return str;
+ }
+}
View
9 core/src/main/resources/oozie-default.xml
@@ -1294,6 +1294,15 @@
</description>
</property>
+ <property>
+ <name>oozie.actions.main.classnames</name>
+ <value>distcp=org.apache.hadoop.tools.DistCp</value>
+ <description>
+ A list of class name mapping for Action classes
+ </description>
+ </property>
+
+
<property>
<name>oozie.service.WorkflowAppService.system.libpath</name>
<value>/user/${user.name}/share/lib</value>
View
99 core/src/test/java/org/apache/oozie/action/hadoop/TestDistCpActionExecutor.java
@@ -0,0 +1,99 @@
+package org.apache.oozie.action.hadoop;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobID;
+import org.apache.hadoop.mapred.RunningJob;
+import org.apache.oozie.WorkflowActionBean;
+import org.apache.oozie.WorkflowJobBean;
+import org.apache.oozie.client.OozieClient;
+import org.apache.oozie.client.WorkflowAction;
+import org.apache.oozie.service.WorkflowAppService;
+import org.apache.oozie.util.IOUtils;
+import org.apache.oozie.util.XConfiguration;
+
+public class TestDistCpActionExecutor extends ActionExecutorTestCase{
+
+ @Override
+ protected void setSystemProps() {
+ super.setSystemProps();
+ setSystemProperty("oozie.service.ActionService.executor.classes", DistcpActionExecutor.class.getName());
+ }
+
+ public void testSimpestdistCpSubmitOK() throws Exception {
+ String actionXml = "<distcp>" +
+ "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" +
+ "<name-node>" + getNameNodeUri() + "</name-node>" +
+ "<arg>-Dmyqueue</arg>" +
+ "<arg>input</arg>"+
+ "<arg>output</arg>" +
+ "</distcp>";
+ Context context = createContext(actionXml);
+ final RunningJob runningJob = submitAction(context);
+ waitFor(60 * 1000, new Predicate() {
+ public boolean evaluate() throws Exception {
+ return runningJob.isComplete();
+ }
+ });
+ assertTrue(runningJob.isSuccessful());
+ }
+
+
+ protected Context createContext(String actionXml) throws Exception {
+ DistcpActionExecutor ae = new DistcpActionExecutor();
+
+ Path appJarPath = new Path("lib/test.jar");
+ File jarFile = IOUtils.createJar(new File(getTestCaseDir()), "test.jar", LauncherMainTester.class);
+ InputStream is = new FileInputStream(jarFile);
+ OutputStream os = getFileSystem().create(new Path(getAppPath(), "lib/test.jar"));
+ IOUtils.copyStream(is, os);
+
+ Path appSoPath = new Path("lib/test.so");
+ getFileSystem().create(new Path(getAppPath(), appSoPath)).close();
+
+ XConfiguration protoConf = new XConfiguration();
+ protoConf.set(WorkflowAppService.HADOOP_USER, getTestUser());
+ protoConf.set(WorkflowAppService.HADOOP_UGI, getTestUser() + "," + getTestGroup());
+ protoConf.set(OozieClient.GROUP_NAME, getTestGroup());
+ protoConf.setStrings(WorkflowAppService.APP_LIB_PATH_LIST, appJarPath.toString(), appSoPath.toString());
+ injectKerberosInfo(protoConf);
+
+ WorkflowJobBean wf = createBaseWorkflow(protoConf, "action");
+ WorkflowActionBean action = (WorkflowActionBean) wf.getActions().get(0);
+ action.setType(ae.getType());
+ action.setConf(actionXml);
+
+ return new Context(wf, action);
+ }
+
+
+ protected RunningJob submitAction(Context context) throws Exception {
+ DistcpActionExecutor ae = new DistcpActionExecutor();
+
+ WorkflowAction action = context.getAction();
+
+ ae.prepareActionDir(getFileSystem(), context);
+ ae.submitLauncher(getFileSystem(), context, action);
+
+ String jobId = action.getExternalId();
+ String jobTracker = action.getTrackerUri();
+ String consoleUrl = action.getConsoleUrl();
+ assertNotNull(jobId);
+ assertNotNull(jobTracker);
+ assertNotNull(consoleUrl);
+
+ JobConf jobConf = new JobConf();
+ jobConf.set("mapred.job.tracker", jobTracker);
+ injectKerberosInfo(jobConf);
+ JobClient jobClient = new JobClient(jobConf);
+ final RunningJob runningJob = jobClient.getJob(JobID.forName(jobId));
+ assertNotNull(runningJob);
+ return runningJob;
+ }
+}
View
42 core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java
@@ -14,31 +14,43 @@
*/
package org.apache.oozie.action.hadoop;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.StringReader;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Properties;
+
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobID;
+import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.filecache.DistributedCache;
-import org.apache.hadoop.io.Text;
import org.apache.oozie.WorkflowActionBean;
import org.apache.oozie.WorkflowJobBean;
import org.apache.oozie.action.ActionExecutor;
import org.apache.oozie.action.ActionExecutorException;
-import org.apache.oozie.client.WorkflowAction;
import org.apache.oozie.client.OozieClient;
+import org.apache.oozie.client.WorkflowAction;
import org.apache.oozie.client.WorkflowJob;
import org.apache.oozie.service.Services;
import org.apache.oozie.service.UUIDService;
import org.apache.oozie.service.WorkflowAppService;
import org.apache.oozie.service.WorkflowStoreService;
+import org.apache.oozie.util.IOUtils;
import org.apache.oozie.util.XConfiguration;
import org.apache.oozie.util.XmlUtils;
-import org.apache.oozie.util.IOUtils;
import org.apache.oozie.workflow.WorkflowApp;
import org.apache.oozie.workflow.WorkflowInstance;
import org.apache.oozie.workflow.WorkflowLib;
@@ -47,18 +59,6 @@
import org.apache.oozie.workflow.lite.StartNodeDef;
import org.jdom.Element;
-import java.io.File;
-import java.io.OutputStream;
-import java.io.InputStream;
-import java.io.FileInputStream;
-import java.io.StringReader;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Properties;
-
public class TestJavaActionExecutor extends ActionExecutorTestCase {
@Override
@@ -250,7 +250,7 @@ public void testSetupMethods() throws Exception {
}
- private Context createContext(String actionXml) throws Exception {
+ protected Context createContext(String actionXml) throws Exception {
JavaActionExecutor ae = new JavaActionExecutor();
Path appJarPath = new Path("lib/test.jar");
@@ -277,7 +277,7 @@ private Context createContext(String actionXml) throws Exception {
return new Context(wf, action);
}
- private RunningJob submitAction(Context context) throws Exception {
+ protected RunningJob submitAction(Context context) throws Exception {
JavaActionExecutor ae = new JavaActionExecutor();
WorkflowAction action = context.getAction();
View
3  release-log.txt
@@ -1,8 +1,9 @@
-- Oozie 3.1.0 release
+OOZIE-11 Adding Distcp Action.
OOZIE-6 Custom filters option and User information column added to Coordinator jobs section of Oozie Web Console
OOZIE-124 Update documentation for job-type in WS API
-OOZIE-81 Add an email action to Oozie
+OOZIE-81 Add an email action to Oozie
OOZIE-113 merge changes for OOZIE-101 to ActionEndXCommand
OOZIE-112 workflow kill node message is not resolved and set it to action error message
OOZIE-111 adjust configuration for DBCP data source
Please sign in to comment.
Something went wrong with that request. Please try again.