Skip to content
This repository has been archived by the owner on Jan 6, 2018. It is now read-only.

Commit

Permalink
Closes OOZIE-70 Add application namespace to coord_jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
Angelo Kaichen Huang authored and bansalmayank committed Apr 21, 2011
1 parent eef7a24 commit f65dc41
Show file tree
Hide file tree
Showing 8 changed files with 450 additions and 66 deletions.
337 changes: 291 additions & 46 deletions core/src/main/java/org/apache/oozie/CoordinatorJobBean.java

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion core/src/main/java/org/apache/oozie/ErrorCode.java
Expand Up @@ -194,7 +194,7 @@ public enum ErrorCode {
E1316(XLog.STD, "Bundle job can not be unpaused, {0}"),
E1317(XLog.STD, "Invalid bundle job change value {0}, {1}"),
E1318(XLog.STD, "No coord jobs for the bundle=[{0}], fail the bundle"),

E1319(XLog.STD, "Invalid bundle coord job namespace, [{0}]"),

ETEST(XLog.STD, "THIS SHOULD HAPPEN ONLY IN TESTING, invalid job id [{0}]"),;

Expand Down
Expand Up @@ -200,17 +200,11 @@ protected String submit() throws CommandException {

String appXml = readAndValidateXml();
coordJob.setOrigJobXml(appXml);
LOG.debug("jobXml after initial validation " + XmlUtils.prettyPrint(appXml).toString());

if (this.bundleId != null) {
// this coord job is created from bundle
coordJob.setBundleId(this.bundleId);
}
if (this.coordName != null) {
// this coord job is created from bundle
coordJob.setAppName(this.coordName);
}
String appNamespace = readAppNamespace(appXml);
coordJob.setAppNamespace(appNamespace);

LOG.debug("jobXml after initial validation " + XmlUtils.prettyPrint(appXml).toString());
appXml = XmlUtils.removeComments(appXml);
initEvaluators();
Element eJob = basicResolveAndIncludeDS(appXml, conf, coordJob);
Expand Down Expand Up @@ -317,6 +311,34 @@ private void validateXml(String xmlContent) throws CoordinatorJobException {
}
}

/**
* Read the application XML schema namespace
*
* @param xmlContent input coordinator xml
* @return app xml namespace
* @throws CoordinatorJobException
*/
private String readAppNamespace(String xmlContent) throws CoordinatorJobException {
try {
Element coordXmlElement = XmlUtils.parseXml(xmlContent);
Namespace ns = coordXmlElement.getNamespace();
if (ns != null && bundleId != null && ns.getURI().equals(SchemaService.COORDINATOR_NAMESPACE_URI_1)) {
throw new CoordinatorJobException(ErrorCode.E1319, "bundle app can not submit coordinator namespace "
+ SchemaService.COORDINATOR_NAMESPACE_URI_1 + ", please use 0.2 or later");
}
if (ns != null) {
return ns.getURI();
}
else {
throw new CoordinatorJobException(ErrorCode.E0700, "the application xml namespace is not given");
}
}
catch (JDOMException ex) {
LOG.warn("JDOMException :", ex);
throw new CoordinatorJobException(ErrorCode.E0700, ex.getMessage(), ex);
}
}

/**
* Merge default configuration with user-defined configuration.
*
Expand Down Expand Up @@ -926,6 +948,14 @@ protected void loadState() throws CommandException {
throw new CommandException(ErrorCode.E0610);
}
coordJob = new CoordinatorJobBean();
if (this.bundleId != null) {
// this coord job is created from bundle
coordJob.setBundleId(this.bundleId);
}
if (this.coordName != null) {
// this coord job is created from bundle
coordJob.setAppName(this.coordName);
}
setJob(coordJob);
}

Expand Down
Expand Up @@ -46,6 +46,8 @@ public class SchemaService implements Service {

public static final String SLA_NAME_SPACE_URI = "uri:oozie:sla:0.1";

public static final String COORDINATOR_NAMESPACE_URI_1 = "uri:oozie:coordinator:0.1";

private Schema wfSchema;

private Schema coordSchema;
Expand Down
Expand Up @@ -22,14 +22,34 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.oozie.CoordinatorJobBean;
import org.apache.oozie.ErrorCode;
import org.apache.oozie.client.Job;
import org.apache.oozie.client.OozieClient;
import org.apache.oozie.command.CommandException;
import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.Services;
import org.apache.oozie.test.XDataTestCase;
import org.apache.oozie.util.XConfiguration;

public class TestCoordSubmitXCommand extends CoordXTestCase {
public class TestCoordSubmitXCommand extends XDataTestCase {

private Services services;

@Override
protected void setUp() throws Exception {
super.setUp();
services = new Services();
services.init();
cleanUpDBTables();
}

@Override
protected void tearDown() throws Exception {
services.destroy();
super.tearDown();
}

/**
* Basic test
*
Expand Down Expand Up @@ -68,6 +88,91 @@ public void testBasicSubmit() throws Exception {
}
}

/**
* Basic coordinator submit test with bundleId
*
* @throws Exception
*/
public void testBasicSubmitWithBundleId() throws Exception {
Configuration conf = new XConfiguration();
String appPath = getTestCaseDir() + File.separator + "coordinator.xml";
String appXml = "<coordinator-app name=\"NAME\" frequency=\"${coord:days(1)}\" start=\"2009-02-01T01:00Z\" end=\"2009-02-03T23:59Z\" timezone=\"UTC\" "
+ "xmlns=\"uri:oozie:coordinator:0.2\"> <controls> <concurrency>2</concurrency> "
+ "<execution>LIFO</execution> </controls> <datasets> "
+ "<dataset name=\"a\" frequency=\"${coord:days(7)}\" initial-instance=\"2009-02-01T01:00Z\" "
+ "timezone=\"UTC\"> <uri-template>file:///tmp/coord/workflows/${YEAR}/${DAY}</uri-template> </dataset> "
+ "<dataset name=\"local_a\" frequency=\"${coord:days(7)}\" initial-instance=\"2009-02-01T01:00Z\" "
+ "timezone=\"UTC\"> <uri-template>file:///tmp/coord/workflows/${YEAR}/${DAY}</uri-template> </dataset> "
+ "</datasets> <input-events> "
+ "<data-in name=\"A\" dataset=\"a\"> <instance>${coord:latest(0)}</instance> </data-in> "
+ "</input-events> "
+ "<output-events> <data-out name=\"LOCAL_A\" dataset=\"local_a\"> "
+ "<instance>${coord:current(-1)}</instance> </data-out> </output-events> <action> <workflow> <app-path>hdfs:///tmp/workflows/</app-path> "
+ "<configuration> <property> <name>inputA</name> <value>${coord:dataIn('A')}</value> </property> "
+ "<property> <name>inputB</name> <value>${coord:dataOut('LOCAL_A')}</value> "
+ "</property></configuration> </workflow> </action> </coordinator-app>";
writeToFile(appXml, appPath);
conf.set(OozieClient.COORDINATOR_APP_PATH, appPath);
conf.set(OozieClient.USER_NAME, getTestUser());
conf.set(OozieClient.GROUP_NAME, "other");

this.addRecordToBundleActionTable("OOZIE-B", "COORD-NAME", 0, Job.Status.PREP);

CoordSubmitXCommand sc = new CoordSubmitXCommand(conf, "UNIT_TESTING", "OOZIE-B", "COORD-NAME");
String jobId = sc.call();

assertEquals(jobId.substring(jobId.length() - 2), "-C");
CoordinatorJobBean job = checkCoordJobs(jobId);
if (job != null) {
assertEquals("OOZIE-B", job.getBundleId());
assertEquals("COORD-NAME", job.getAppName());
assertEquals("uri:oozie:coordinator:0.2", job.getAppNamespace());
} else {
fail();
}
}

/**
* Basic coordinator submit test from bundle but with wrong namespace
*
* @throws Exception
*/
public void testBasicSubmitWithWrongNamespace() throws Exception {
Configuration conf = new XConfiguration();
String appPath = getTestCaseDir() + File.separator + "coordinator.xml";
String appXml = "<coordinator-app name=\"NAME\" frequency=\"${coord:days(1)}\" start=\"2009-02-01T01:00Z\" end=\"2009-02-03T23:59Z\" timezone=\"UTC\" "
+ "xmlns=\"uri:oozie:coordinator:0.1\"> <controls> <concurrency>2</concurrency> "
+ "<execution>LIFO</execution> </controls> <datasets> "
+ "<dataset name=\"a\" frequency=\"${coord:days(7)}\" initial-instance=\"2009-02-01T01:00Z\" "
+ "timezone=\"UTC\"> <uri-template>file:///tmp/coord/workflows/${YEAR}/${DAY}</uri-template> </dataset> "
+ "<dataset name=\"local_a\" frequency=\"${coord:days(7)}\" initial-instance=\"2009-02-01T01:00Z\" "
+ "timezone=\"UTC\"> <uri-template>file:///tmp/coord/workflows/${YEAR}/${DAY}</uri-template> </dataset> "
+ "</datasets> <input-events> "
+ "<data-in name=\"A\" dataset=\"a\"> <instance>${coord:latest(0)}</instance> </data-in> "
+ "</input-events> "
+ "<output-events> <data-out name=\"LOCAL_A\" dataset=\"local_a\"> "
+ "<instance>${coord:current(-1)}</instance> </data-out> </output-events> <action> <workflow> <app-path>hdfs:///tmp/workflows/</app-path> "
+ "<configuration> <property> <name>inputA</name> <value>${coord:dataIn('A')}</value> </property> "
+ "<property> <name>inputB</name> <value>${coord:dataOut('LOCAL_A')}</value> "
+ "</property></configuration> </workflow> </action> </coordinator-app>";
writeToFile(appXml, appPath);
conf.set(OozieClient.COORDINATOR_APP_PATH, appPath);
conf.set(OozieClient.USER_NAME, getTestUser());
conf.set(OozieClient.GROUP_NAME, "other");

this.addRecordToBundleActionTable("OOZIE-B", "COORD-NAME", 0, Job.Status.PREP);

CoordSubmitXCommand sc = new CoordSubmitXCommand(conf, "UNIT_TESTING", "OOZIE-B", "COORD-NAME");
try {
sc.call();
fail("Exception expected because namespace is too old when submit coordinator through bundle!");
}
catch (CommandException e) {
// should come here for namespace errors
}

}

/**
* Basic test
*
Expand Down Expand Up @@ -398,6 +503,7 @@ public void testAvailConfigDefaults() throws Exception {
*/
private CoordinatorJobBean checkCoordJobs(String jobId) {
try {
JPAService jpaService = Services.get().get(JPAService.class);
CoordinatorJobBean job = jpaService.execute(new CoordJobGetJPAExecutor(jobId));
return job;
}
Expand Down
16 changes: 8 additions & 8 deletions core/src/test/java/org/apache/oozie/test/XDataTestCase.java
Expand Up @@ -683,15 +683,15 @@ protected BundleJobBean addRecordToBundleJobTableNegative(Job.Status jobStatus)
* Create bundle action bean and save to db
*
* @param jobId bundle job id
* @param actionId bnudle action id
* @param coordName coordinator name
* @param pending true if action is pending
* @param status job status
* @return bundle action bean
* @throws Exception
*/
protected BundleActionBean addRecordToBundleActionTable(String jobId, String actionId, int pending,
protected BundleActionBean addRecordToBundleActionTable(String jobId, String coordName, int pending,
Job.Status status) throws Exception {
BundleActionBean action = createBundleAction(jobId, actionId, pending, status);
BundleActionBean action = createBundleAction(jobId, coordName, pending, status);

try {
JPAService jpaService = Services.get().get(JPAService.class);
Expand All @@ -712,20 +712,20 @@ protected BundleActionBean addRecordToBundleActionTable(String jobId, String act
* Create bundle action bean
*
* @param jobId bundle job id
* @param actionId bnudle action id
* @param coordName coordinator name
* @param pending true if action is pending
* @param status job status
* @return bundle action bean
* @throws Exception
*/
protected BundleActionBean createBundleAction(String jobId, String actionId, int pending, Job.Status status)
protected BundleActionBean createBundleAction(String jobId, String coordName, int pending, Job.Status status)
throws Exception {
BundleActionBean action = new BundleActionBean();
action.setBundleId(jobId);
action.setBundleActionId(jobId + "_" + actionId);
action.setBundleActionId(jobId + "_" + coordName);
action.setPending(pending);
action.setCoordId(actionId);
action.setCoordName(actionId);
action.setCoordId(coordName);
action.setCoordName(coordName);
action.setStatus(status);
action.setLastModifiedTime(new Date());

Expand Down
2 changes: 1 addition & 1 deletion core/src/test/resources/coord-job-bundle.xml
Expand Up @@ -14,7 +14,7 @@
-->
<coordinator-app name="COORD-TEST" frequency="${coord:days(1)}"
start="${START_TIME}" end="${END_TIME}" timezone="UTC"
xmlns="uri:oozie:coordinator:0.1">
xmlns="uri:oozie:coordinator:0.2">
<controls>
<concurrency>2</concurrency>
<execution>LIFO</execution>
Expand Down
1 change: 1 addition & 0 deletions release-log.txt
Expand Up @@ -2,6 +2,7 @@

OOZIE-62 client command error when bundle rerun with -action.
GH-0566 If Java Main exit code !=0, LauncherMapper should the exit code as the error code
OOZIE-70 Add application namespace to coord_jobs
OOZIE-69 KillTransitionXCommand should update job after kill children
OOZIE-68 rename wf job column parentId to parent_id
OOZIE-64 Fix coordinator el function actualTime()
Expand Down

0 comments on commit f65dc41

Please sign in to comment.