Permalink
Browse files

Closes OOZIE-115 oozie should check if app path is directory and read…

… the app xml under it
  • Loading branch information...
1 parent a4d9e47 commit efc7c4ad02a8db73131ca7607a6d87361ead1cc6 Angelo Kaichen Huang committed with bansalmayank Jun 8, 2011
@@ -387,10 +387,15 @@ protected String getLauncherMain(Configuration launcherConf, Element actionXml)
}
@SuppressWarnings("unchecked")
- JobConf createLauncherConf(Context context, WorkflowAction action, Element actionXml, Configuration actionConf)
+ JobConf createLauncherConf(FileSystem actionFs, Context context, WorkflowAction action, Element actionXml, Configuration actionConf)
throws ActionExecutorException {
try {
- Path appPathRoot = new Path(context.getWorkflow().getAppPath()).getParent();
+
+ // app path could be a file
+ Path appPathRoot = new Path(context.getWorkflow().getAppPath());
+ if (actionFs.isFile(appPathRoot)) {
+ appPathRoot = appPathRoot.getParent();
+ }
// launcher job configuration
Configuration launcherConf = createBaseHadoopConf(context, actionXml);
@@ -473,11 +478,17 @@ void injectLauncherCallback(Context context, Configuration launcherConf) {
injectCallback(context, launcherConf);
}
- public void submitLauncher(Context context, WorkflowAction action) throws ActionExecutorException {
+ public void submitLauncher(FileSystem actionFs, Context context, WorkflowAction action) throws ActionExecutorException {
JobClient jobClient = null;
boolean exception = false;
try {
- Path appPathRoot = new Path(context.getWorkflow().getAppPath()).getParent();
+ Path appPathRoot = new Path(context.getWorkflow().getAppPath());
+
+ // app path could be a file
+ if (actionFs.isFile(appPathRoot)) {
+ appPathRoot = appPathRoot.getParent();
+ }
+
Element actionXml = XmlUtils.parseXml(action.getConf());
// action job configuration
@@ -510,7 +521,7 @@ public void submitLauncher(Context context, WorkflowAction action) throws Action
}
}
- JobConf launcherJobConf = createLauncherConf(context, action, actionXml, actionConf);
+ JobConf launcherJobConf = createLauncherConf(actionFs, context, action, actionXml, actionConf);
injectLauncherCallback(context, launcherJobConf);
XLog.getLog(getClass()).debug("Creating Job Client for action " + action.getId());
jobClient = createJobClient(context, launcherJobConf);
@@ -592,7 +603,7 @@ private boolean needInjectCredentials() {
}
catch (ClassNotFoundException ex) {
methodExists = false;
- }
+ }
catch (NoSuchMethodException ex) {
methodExists = false;
}
@@ -722,7 +733,7 @@ public void start(Context context, WorkflowAction action) throws ActionExecutorE
XLog.getLog(getClass()).debug("Preparing action Dir through copying " + context.getActionDir());
prepareActionDir(actionFs, context);
XLog.getLog(getClass()).debug("Action Dir is ready. Submitting the action ");
- submitLauncher(context, action);
+ submitLauncher(actionFs, context, action);
XLog.getLog(getClass()).debug("Action submit completed. Performing check ");
check(context, action);
XLog.getLog(getClass()).debug("Action check is done after submission");
@@ -261,14 +261,22 @@ protected void eagerVerifyPrecondition() throws CommandException, PreconditionEx
* @throws CommandException thrown if failed to merge configuration
*/
protected void mergeDefaultConfig() throws CommandException {
- Path appPath = new Path(conf.get(OozieClient.BUNDLE_APP_PATH));
- Path configDefault = new Path(appPath.getParent(), CONFIG_DEFAULT);
- FileSystem fs;
+ Path configDefault = null;
try {
+ String bundleAppPathStr = conf.get(OozieClient.BUNDLE_APP_PATH);
+ Path bundleAppPath = new Path(bundleAppPathStr);
String user = ParamChecker.notEmpty(conf.get(OozieClient.USER_NAME), OozieClient.USER_NAME);
String group = ParamChecker.notEmpty(conf.get(OozieClient.GROUP_NAME), OozieClient.GROUP_NAME);
- fs = Services.get().get(HadoopAccessorService.class).createFileSystem(user, group, configDefault.toUri(),
+ FileSystem fs = Services.get().get(HadoopAccessorService.class).createFileSystem(user, group, bundleAppPath.toUri(),
new Configuration());
+
+ // app path could be a directory
+ if (!fs.isFile(bundleAppPath)) {
+ configDefault = new Path(bundleAppPath, CONFIG_DEFAULT);
+ } else {
+ configDefault = new Path(bundleAppPath.getParent(), CONFIG_DEFAULT);
+ }
+
if (fs.exists(configDefault)) {
Configuration defaultConf = new XConfiguration(fs.open(configDefault));
PropertiesUtils.checkDisallowedProperties(defaultConf, DISALLOWED_DEFAULT_PROPERTIES);
@@ -321,9 +329,17 @@ protected String readDefinition(String appPath) throws BundleJobException {
LOG.debug("user =" + user + " group =" + group);
FileSystem fs = Services.get().get(HadoopAccessorService.class).createFileSystem(user, group, uri,
new Configuration());
- Path p = new Path(uri.getPath());
+ Path appDefPath = null;
+
+ // app path could be a directory
+ Path path = new Path(uri.getPath());
+ if (!fs.isFile(path)) {
+ appDefPath = new Path(path, BUNDLE_XML_FILE);
+ } else {
+ appDefPath = path;
+ }
- Reader reader = new InputStreamReader(fs.open(p));
+ Reader reader = new InputStreamReader(fs.open(appDefPath));
StringWriter writer = new StringWriter();
IOUtils.copyCharStream(reader, writer);
return writer.toString();
@@ -345,15 +345,22 @@ private String readAppNamespace(String xmlContent) throws CoordinatorJobExceptio
* @throws CommandException thrown if failed to read or merge configurations
*/
protected void mergeDefaultConfig() throws CommandException {
- Path coordAppDir = new Path(conf.get(OozieClient.COORDINATOR_APP_PATH)).getParent();
- Path configDefault = new Path(coordAppDir, CONFIG_DEFAULT);
- Path appPath = new Path(conf.get(OozieClient.COORDINATOR_APP_PATH));
- // Configuration fsConfig = CoordUtils.getHadoopConf(conf);
+ Path configDefault = null;
try {
+ String coordAppPathStr = conf.get(OozieClient.COORDINATOR_APP_PATH);
+ Path coordAppPath = new Path(coordAppPathStr);
String user = ParamChecker.notEmpty(conf.get(OozieClient.USER_NAME), OozieClient.USER_NAME);
String group = ParamChecker.notEmpty(conf.get(OozieClient.GROUP_NAME), OozieClient.GROUP_NAME);
FileSystem fs = Services.get().get(HadoopAccessorService.class).createFileSystem(user, group,
- configDefault.toUri(), new Configuration());
+ coordAppPath.toUri(), new Configuration());
+
+ // app path could be a directory
+ if (!fs.isFile(coordAppPath)) {
+ configDefault = new Path(coordAppPath, CONFIG_DEFAULT);
+ } else {
+ configDefault = new Path(coordAppPath.getParent(), CONFIG_DEFAULT);
+ }
+
if (fs.exists(configDefault)) {
Configuration defaultConf = new XConfiguration(fs.open(configDefault));
PropertiesUtils.checkDisallowedProperties(defaultConf, DISALLOWED_DEFAULT_PROPERTIES);
@@ -869,9 +876,17 @@ protected String readDefinition(String appPath) throws CoordinatorJobException {
LOG.debug("user =" + user + " group =" + group);
FileSystem fs = Services.get().get(HadoopAccessorService.class).createFileSystem(user, group, uri,
new Configuration());
- Path p = new Path(uri.getPath());
+ Path appDefPath = null;
+
+ // app path could be a directory
+ Path path = new Path(uri.getPath());
+ if (!fs.isFile(path)) {
+ appDefPath = new Path(path, COORDINATOR_XML_FILE);
+ } else {
+ appDefPath = path;
+ }
- Reader reader = new InputStreamReader(fs.open(p));
+ Reader reader = new InputStreamReader(fs.open(appDefPath));
StringWriter writer = new StringWriter();
IOUtils.copyCharStream(reader, writer);
return writer.toString();
@@ -15,6 +15,8 @@
package org.apache.oozie.command.wf;
import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
@@ -112,10 +114,18 @@ protected Void execute() throws CommandException {
XConfiguration protoActionConf = wps.createProtoActionConf(conf, authToken, true);
WorkflowLib workflowLib = Services.get().get(WorkflowStoreService.class).getWorkflowLibWithNoDB();
- Path configDefault = new Path(new Path(conf.get(OozieClient.APP_PATH)).getParent(),
- SubmitCommand.CONFIG_DEFAULT);
+ URI uri = new URI(conf.get(OozieClient.APP_PATH));
FileSystem fs = Services.get().get(HadoopAccessorService.class).createFileSystem(wfBean.getUser(),
- wfBean.getGroup(), configDefault.toUri(), new Configuration());
+ wfBean.getGroup(), uri, new Configuration());
+
+ Path configDefault = null;
+ // app path could be a directory
+ Path path = new Path(uri.getPath());
+ if (!fs.isFile(path)) {
+ configDefault = new Path(path, SubmitCommand.CONFIG_DEFAULT);
+ } else {
+ configDefault = new Path(path.getParent(), SubmitCommand.CONFIG_DEFAULT);
+ }
if (fs.exists(configDefault)) {
Configuration defaultConf = new XConfiguration(fs.open(configDefault));
@@ -140,8 +150,11 @@ protected Void execute() throws CommandException {
catch (IOException ex) {
throw new CommandException(ErrorCode.E0803, ex);
}
- catch (HadoopAccessorException e) {
- throw new CommandException(e);
+ catch (HadoopAccessorException ex) {
+ throw new CommandException(ex);
+ }
+ catch (URISyntaxException ex) {
+ throw new CommandException(ErrorCode.E0711, ex.getMessage(), ex);
}
try {
@@ -56,6 +56,7 @@
import java.util.Set;
import java.util.HashSet;
import java.io.IOException;
+import java.net.URI;
public class SubmitXCommand extends WorkflowXCommand<String> {
public static final String CONFIG_DEFAULT = "config-default.xml";
@@ -95,11 +96,20 @@ protected String execute() throws CommandException {
XConfiguration protoActionConf = wps.createProtoActionConf(conf, authToken, true);
WorkflowLib workflowLib = Services.get().get(WorkflowStoreService.class).getWorkflowLibWithNoDB();
- Path configDefault = new Path(new Path(conf.get(OozieClient.APP_PATH)).getParent(), CONFIG_DEFAULT);
String user = conf.get(OozieClient.USER_NAME);
String group = conf.get(OozieClient.GROUP_NAME);
- FileSystem fs = Services.get().get(HadoopAccessorService.class).createFileSystem(user, group,
- configDefault.toUri(), new Configuration());
+ URI uri = new URI(conf.get(OozieClient.APP_PATH));
+ FileSystem fs = Services.get().get(HadoopAccessorService.class).createFileSystem(user,
+ group, uri, new Configuration());
+
+ Path configDefault = null;
+ // app path could be a directory
+ Path path = new Path(uri.getPath());
+ if (!fs.isFile(path)) {
+ configDefault = new Path(path, SubmitCommand.CONFIG_DEFAULT);
+ } else {
+ configDefault = new Path(path.getParent(), SubmitCommand.CONFIG_DEFAULT);
+ }
if (fs.exists(configDefault)) {
try {
@@ -216,7 +226,7 @@ private void writeSLARegistration(String slaXml, String id, String user, String
/**
* Resolve variables in sla xml element.
- *
+ *
* @param eSla sla xml element
* @param evalSla sla evaluator
* @return sla xml string after evaluation
@@ -238,7 +248,7 @@ public static String resolveSla(Element eSla, ELEvaluator evalSla) throws Comman
/**
* Create an EL evaluator for a given group.
- *
+ *
* @param conf configuration variable
* @param group group variable
* @return the evaluator created for the group
@@ -101,7 +101,14 @@ protected String readDefinition(String appPath, String user, String group, Strin
URI uri = new URI(appPath);
FileSystem fs = Services.get().get(HadoopAccessorService.class).
createFileSystem(user, group, uri, new Configuration());
- Reader reader = new InputStreamReader(fs.open(new Path(uri.getPath())));
+
+ // app path could be a directory
+ Path path = new Path(uri.getPath());
+ if (!fs.isFile(path)) {
+ path = new Path(path, "workflow.xml");
+ }
+
+ Reader reader = new InputStreamReader(fs.open(path));
StringWriter writer = new StringWriter();
IOUtils.copyCharStream(reader, writer);
return writer.toString();
@@ -156,12 +163,18 @@ public XConfiguration createProtoActionConf(Configuration jobConf, String authTo
List<String> filePaths;
if (isWorkflowJob) {
- filePaths = getLibFiles(fs, new Path(appPath.getParent(), "lib"));
+ // app path could be a directory
+ Path path = new Path(uri.getPath());
+ if (!fs.isFile(path)) {
+ filePaths = getLibFiles(fs, new Path(appPath + "/lib"));
+ } else {
+ filePaths = getLibFiles(fs, new Path(appPath.getParent(), "lib"));
+ }
}
else {
filePaths = new ArrayList<String>();
}
-
+
if (jobConf.get(OozieClient.LIBPATH) != null) {
Path libPath = new Path(jobConf.get(OozieClient.LIBPATH));
List<String> libPaths = getLibFiles(fs, libPath);
@@ -19,7 +19,6 @@
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.oozie.ErrorCode;
@@ -76,14 +75,6 @@ public static void normalizeAppPath(String user, String group, Configuration con
throw new IOException("Error: " + appPathStr + " does not exist");
}
- FileStatus fileStatus = fs.getFileStatus(appPath);
- Path appXml = appPath;
- // Normalize appPath here - it will always point to a workflow/coordinator/bundle xml definition file;
- if (fileStatus.isDir()) {
- appXml = new Path(appPath, (wfPathStr != null)? "workflow.xml" : (coordPathStr != null ? "coordinator.xml" : "bundle.xml"));
- normalizedAppPathStr = appXml.toString();
- }
-
if (wfPathStr != null) {
conf.set(OozieClient.APP_PATH, normalizedAppPathStr);
}
@@ -98,7 +89,7 @@ else if (bundlePathStr != null) {
/**
* This Function will parse the value of the changed values in key value manner. the change value would be
* key1=value1;key2=value2
- *
+ *
* @param changeValue change value.
* @return This returns the hash with hash<[key1,value1],[key2,value2]>
* @throws CommandException thrown if changeValue cannot be parsed properly.
@@ -41,8 +41,10 @@
import org.apache.oozie.workflow.lite.StartNodeDef;
import java.io.IOException;
+import java.io.OutputStreamWriter;
import java.io.Reader;
import java.io.StringReader;
+import java.io.Writer;
import java.net.URISyntaxException;
import java.util.Date;
import java.util.HashMap;
@@ -201,6 +203,12 @@ protected XConfiguration getBaseProtoConf() {
*/
protected WorkflowJobBean createBaseWorkflow(XConfiguration protoConf, String actionName) throws Exception {
Path appUri = new Path(getAppPath(), "workflow.xml");
+
+ String content = "<workflow-app xmlns='uri:oozie:workflow:0.1' xmlns:sla='uri:oozie:sla:0.1' name='no-op-wf'>";
+ content += "<start to='end' />";
+ content += "<end name='end' /></workflow-app>";
+ writeToFile(content, getAppPath(), "workflow.xml");
+
WorkflowApp app = new LiteWorkflowApp("testApp", "<workflow-app/>", new StartNodeDef("end"))
.addNode(new EndNodeDef("end"));
XConfiguration wfConf = new XConfiguration();
@@ -233,6 +241,8 @@ protected WorkflowJobBean createBaseWorkflowWithCredentials(XConfiguration proto
Reader reader = IOUtils.getResourceAsReader("wf-credentials.xml", -1);
String wfxml = IOUtils.getReaderAsString(reader, -1);
+ writeToFile(wfxml, getAppPath(), "workflow.xml");
+
WorkflowApp app = new LiteWorkflowApp("test-wf-cred", wfxml, new StartNodeDef("start")).addNode(new EndNodeDef(
"end"));
XConfiguration wfConf = new XConfiguration();
@@ -273,4 +283,11 @@ private WorkflowJobBean createWorkflow(WorkflowApp app, Configuration conf, XCon
return workflow;
}
+ private void writeToFile(String content, Path appPath, String fileName) throws IOException {
+ FileSystem fs = getFileSystem();
+ Writer writer = new OutputStreamWriter(fs.create(new Path(appPath, fileName), true));
+ writer.write(content);
+ writer.close();
+ }
+
}
@@ -240,7 +240,7 @@ public void testSetupMethods() throws Exception {
ae.setupActionConf(actionConf, context, actionXml, getFsTestCaseDir());
- conf = ae.createLauncherConf(context, action, actionXml, actionConf);
+ conf = ae.createLauncherConf(getFileSystem(), context, action, actionXml, actionConf);
ae.setupLauncherConf(conf, actionXml, getFsTestCaseDir(), context);
assertEquals("MAIN-CLASS", ae.getLauncherMain(conf, actionXml));
assertTrue(conf.get("mapred.child.java.opts").contains("JAVA-OPTS"));
@@ -283,7 +283,7 @@ private RunningJob submitAction(Context context) throws Exception {
WorkflowAction action = context.getAction();
ae.prepareActionDir(getFileSystem(), context);
- ae.submitLauncher(context, action);
+ ae.submitLauncher(getFileSystem(), context, action);
String jobId = action.getExternalId();
String jobTracker = action.getTrackerUri();
Oops, something went wrong.

0 comments on commit efc7c4a

Please sign in to comment.