Skip to content
This repository has been archived by the owner on Jan 6, 2018. It is now read-only.

Commit

Permalink
Closes OOZIE-123 CoordKillXCommand command uniqueness and increase pr…
Browse files Browse the repository at this point in the history
…iority
  • Loading branch information
Angelo Kaichen Huang authored and Mohammad Kamrul Islam committed Aug 22, 2011
1 parent 0eb99a9 commit 65baec9
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public class CoordKillXCommand extends KillTransitionXCommand {
private CoordinatorJob.Status prevStatus = null;

public CoordKillXCommand(String id) {
super("coord_kill", "coord_kill", 1);
super("coord_kill", "coord_kill", 2);
this.jobId = ParamChecker.notEmpty(id, "id");
}

Expand Down Expand Up @@ -154,7 +154,6 @@ public void notifyParent() throws CommandException {
@Override
public void updateJob() throws CommandException {
try {
coordJob.setEndTime(new Date());
jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob));
}
catch (JPAExecutorException ex) {
Expand All @@ -170,4 +169,12 @@ public Job getJob() {
return coordJob;
}

/* (non-Javadoc)
* @see org.apache.oozie.command.XCommand#getKey()
*/
@Override
public String getKey(){
return getName() + "_" + jobId;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -312,10 +312,13 @@ else if (context.status == Status.SUSPENDED) {
*/
public NodeDef getNodeDef(String executionPath) {
NodeInstance nodeJob = executionPaths.get(executionPath);
NodeDef nodeDef = null;
if (nodeJob == null) {
log.error("invalid execution path [{0}]", executionPath);
} else {
nodeDef = def.getNode(nodeJob.nodeName);
}
NodeDef nodeDef = def.getNode(nodeJob.nodeName);

if (nodeDef == null) {
log.error("invalid transition [{0}]", nodeJob.nodeName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
*/
package org.apache.oozie.command.coord;

import java.util.Arrays;
import java.util.Date;
import java.util.List;

import org.apache.oozie.CoordinatorActionBean;
import org.apache.oozie.CoordinatorJobBean;
Expand All @@ -24,6 +26,7 @@
import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobUpdateJPAExecutor;
import org.apache.oozie.service.CallableQueueService;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.SchemaService;
import org.apache.oozie.service.Services;
Expand Down Expand Up @@ -153,15 +156,15 @@ public void testCoordKillForBackwardSupport() throws Exception {

job = jpaService.execute(coordJobGetCmd);
action = jpaService.execute(coordActionGetCmd);
assertEquals(job.getStatus(), CoordinatorJob.Status.SUCCEEDED);
assertEquals(action.getStatus(), CoordinatorAction.Status.RUNNING);
assertEquals(CoordinatorJob.Status.SUCCEEDED, job.getStatus());
assertEquals(CoordinatorAction.Status.RUNNING, action.getStatus());

new CoordKillXCommand(job.getId()).call();

job = jpaService.execute(coordJobGetCmd);
action = jpaService.execute(coordActionGetCmd);
assertEquals(job.getStatus(), CoordinatorJob.Status.KILLED);
assertEquals(action.getStatus(), CoordinatorAction.Status.KILLED);
assertEquals(CoordinatorJob.Status.KILLED, job.getStatus());
assertEquals(CoordinatorAction.Status.KILLED, action.getStatus());
}


Expand Down Expand Up @@ -194,4 +197,61 @@ public void testCoordKillFailed() throws Exception {
}
}

public class MyCoordKillXCommand extends CoordKillXCommand {
long executed = 0;
int wait;

public MyCoordKillXCommand(String jobId, int wait) {
super(jobId);
this.wait = wait;
}

@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("Type:").append(getType());
sb.append(",Priority:").append(getPriority());
return sb.toString();
}

@Override
protected Void execute() throws CommandException {
try {
Thread.sleep(wait);
}
catch (InterruptedException e) {
}
executed = System.currentTimeMillis();
return null;
}

}

public void testCoordKillXCommandUniqueness() throws Exception {

CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, false, false);

final MyCoordKillXCommand callable1 = new MyCoordKillXCommand(job.getId(), 100);
final MyCoordKillXCommand callable2 = new MyCoordKillXCommand(job.getId(), 100);
final MyCoordKillXCommand callable3 = new MyCoordKillXCommand(job.getId(), 100);

List<MyCoordKillXCommand> callables = Arrays.asList(callable1, callable2, callable3);

CallableQueueService queueservice = services.get(CallableQueueService.class);

for (MyCoordKillXCommand c : callables) {
queueservice.queue(c);
}

waitFor(500, new Predicate() {
public boolean evaluate() throws Exception {
return callable1.executed != 0 && callable2.executed == 0 && callable3.executed == 0;
}
});

assertTrue(callable1.executed != 0);
assertTrue(callable2.executed == 0);
assertTrue(callable3.executed == 0);
}

}
1 change: 1 addition & 0 deletions release-log.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
-- Oozie 3.1.0 release

OOZIE-123 CoordKillXCommand command uniqueness and increase priority
OOZIE-9 (Apache) Launcher job should be able to run in different queue than job itself
OOZIE-20 add sql scripts for oozie 3.1.0
OOZIE-135 support multiple shared lib path in oozie
Expand Down

0 comments on commit 65baec9

Please sign in to comment.