Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implemented test for issue UnsupportedOperationException when calling schedulerapi.killTask #2749 #2781

Merged
merged 1 commit into from
Feb 7, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,17 @@
*/
package functionaltests;

import com.google.common.io.Files;
import functionaltests.jobs.ErrorTask;
import functionaltests.jobs.LogTask;
import functionaltests.jobs.MetadataTask;
import functionaltests.jobs.SimpleJob;
import functionaltests.jobs.VariableTask;
import static functionaltests.RestFuncTHelper.getRestServerUrl;
import static functionaltests.jobs.SimpleJob.TEST_JOB;

import java.io.File;
import java.net.URI;
import java.net.URL;
import java.util.Map;
import java.util.Stack;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.apache.commons.io.IOUtils;
import org.junit.Assert;
import org.junit.BeforeClass;
Expand All @@ -54,24 +59,35 @@
import org.ow2.proactive.scheduler.common.SchedulerEvent;
import org.ow2.proactive.scheduler.common.SchedulerEventListener;
import org.ow2.proactive.scheduler.common.SchedulerStatus;
import org.ow2.proactive.scheduler.common.job.*;
import org.ow2.proactive.scheduler.common.task.*;
import org.ow2.proactive.scheduler.common.job.Job;
import org.ow2.proactive.scheduler.common.job.JobId;
import org.ow2.proactive.scheduler.common.job.JobInfo;
import org.ow2.proactive.scheduler.common.job.JobResult;
import org.ow2.proactive.scheduler.common.job.JobState;
import org.ow2.proactive.scheduler.common.job.JobStatus;
import org.ow2.proactive.scheduler.common.job.TaskFlowJob;
import org.ow2.proactive.scheduler.common.job.UserIdentification;
import org.ow2.proactive.scheduler.common.task.ForkEnvironment;
import org.ow2.proactive.scheduler.common.task.ScriptTask;
import org.ow2.proactive.scheduler.common.task.TaskId;
import org.ow2.proactive.scheduler.common.task.TaskInfo;
import org.ow2.proactive.scheduler.common.task.TaskResult;
import org.ow2.proactive.scheduler.common.task.TaskState;
import org.ow2.proactive.scheduler.common.task.TaskStatus;
import org.ow2.proactive.scheduler.rest.ISchedulerClient;
import org.ow2.proactive.scheduler.rest.SchedulerClient;
import org.ow2.proactive.scheduler.task.exceptions.TaskException;
import org.ow2.proactive.scripting.SimpleScript;
import org.ow2.proactive.scripting.TaskScript;

import java.io.File;
import java.net.URI;
import java.net.URL;
import java.util.Map;
import java.util.Stack;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import com.google.common.io.Files;

import static functionaltests.RestFuncTHelper.getRestServerUrl;
import static functionaltests.jobs.SimpleJob.TEST_JOB;
import functionaltests.jobs.ErrorTask;
import functionaltests.jobs.LogTask;
import functionaltests.jobs.MetadataTask;
import functionaltests.jobs.NonTerminatingJob;
import functionaltests.jobs.SimpleJob;
import functionaltests.jobs.VariableTask;


public class SchedulerClientTest extends AbstractRestFuncTestCase {
Expand Down Expand Up @@ -123,6 +139,7 @@ public void testWaitForTerminatingJob() throws Exception {
client.waitForJob(jobId, TimeUnit.MINUTES.toMillis(3));
}


@Test(timeout = MAX_WAIT_TIME)
public void testJobResult() throws Throwable {
ISchedulerClient client = clientInstance();
Expand Down Expand Up @@ -150,39 +167,39 @@ public void testJobResult() throws Throwable {

checkJobInfo(jobState.getJobInfo());

TaskState errorTaskState = findTask(ErrorTask.class.getSimpleName() + "Task", jobState.getHMTasks());
TaskState errorTaskState = findTask(getTaskNameForClass(ErrorTask.class), jobState.getHMTasks());
Assert.assertNotNull(errorTaskState);
TaskState simpleTaskState = findTask(SimpleJob.class.getSimpleName() + "Task", jobState.getHMTasks());
TaskState simpleTaskState = findTask(getTaskNameForClass(SimpleJob.class), jobState.getHMTasks());
Assert.assertNotNull(simpleTaskState);
Assert.assertEquals(TaskStatus.FAULTY, errorTaskState.getStatus());
Assert.assertEquals(TaskStatus.FINISHED, simpleTaskState.getStatus());

// task result simple
TaskResult tResSimple = result.getResult(SimpleJob.class.getSimpleName() + "Task");
TaskResult tResSimple = result.getResult(getTaskNameForClass(SimpleJob.class));
Assert.assertNotNull(tResSimple.value());
Assert.assertEquals(new StringWrapper(TEST_JOB), tResSimple.value());

// task result with error
TaskResult tResError = result.getResult(ErrorTask.class.getSimpleName() + "Task");
TaskResult tResError = result.getResult(getTaskNameForClass(ErrorTask.class));
Assert.assertNotNull(tResError);
Assert.assertTrue(tResError.hadException());
Assert.assertNotNull(tResError.getException());
Assert.assertTrue(tResError.getException() instanceof TaskException);

// task result with logs
TaskResult tResLog = result.getResult(LogTask.class.getSimpleName() + "Task");
TaskResult tResLog = result.getResult(getTaskNameForClass(LogTask.class));
Assert.assertNotNull(tResLog);
Assert.assertNotNull(tResLog.getOutput());
System.err.println(tResLog.getOutput().getStdoutLogs(false));
Assert.assertTrue(tResLog.getOutput().getStdoutLogs(false).contains(LogTask.HELLO_WORLD));

// task result with variables
TaskResult tResVar = result.getResult(VariableTask.class.getSimpleName() + "Task");
TaskResult tResVar = result.getResult(getTaskNameForClass(VariableTask.class));
Assert.assertNotNull(tResVar.getPropagatedVariables());
Assert.assertTrue(tResVar.getPropagatedVariables().containsKey(VariableTask.MYVAR));

// task result with metadata
TaskResult tMetaVar = result.getResult(MetadataTask.class.getSimpleName() + "Task");
TaskResult tMetaVar = result.getResult(getTaskNameForClass(MetadataTask.class));
Assert.assertNotNull(tMetaVar.getMetadata());
Assert.assertTrue(tMetaVar.getMetadata().containsKey(MetadataTask.MYVAR));

Expand Down Expand Up @@ -314,6 +331,26 @@ public void testJobSubmissionEventListener() throws Exception {
client.waitForJob(jobId, TimeUnit.SECONDS.toMillis(120));
}

@Test(timeout = MAX_WAIT_TIME)
public void testKillTask() throws Exception {
ISchedulerClient client = clientInstance();
Job job = createJob(NonTerminatingJob.class);
SchedulerEventListenerImpl listener = new SchedulerEventListenerImpl();
client.addEventListener(listener, true, SchedulerEvent.TASK_PENDING_TO_RUNNING);
JobId jobId = submitJob(job, client);

TaskInfo startedTask = listener.getStartedTask();
while (!startedTask.getJobId().value().equals(jobId.value())) {
startedTask = listener.getStartedTask();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you explain a little about this variable (startedTask)? i dont see it is used after in this test

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

startedTask is a TaskInfo object, used in the loop to check that the task started corresponds to the job which was submitted.

Basically the loop goes on until the task we want to kill is started

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, understood ! thx

}

client.killTask(jobId, getTaskNameForClass(NonTerminatingJob.class));

client.removeEventListener();
// should return immediately
client.waitForJob(jobId, TimeUnit.MINUTES.toMillis(3));
}

@Test(timeout = MAX_WAIT_TIME)
public void testPushFileWithNonAdminUserPwdShouldSucceed() throws Exception {
File tmpFile = testFolder.newFile();
Expand Down Expand Up @@ -341,6 +378,8 @@ private JobId submitJob(Job job, ISchedulerClient client) throws Exception {
private static class SchedulerEventListenerImpl implements SchedulerEventListener {
private Stack<JobState> jobStateStack = new Stack<>();

private Stack<TaskInfo> taskStateStack = new Stack<>();

@Override
public void jobSubmittedEvent(JobState jobState) {
System.out.println("JobSubmittedEvent()");
Expand All @@ -352,7 +391,7 @@ public void jobSubmittedEvent(JobState jobState) {
}

public JobState getSubmittedJob() {
System.out.println("getSubmittedJbo");
System.out.println("getSubmittedJob");
synchronized (this) {
if (jobStateStack.isEmpty()) {
System.out.println("Stack is empty");
Expand All @@ -366,6 +405,21 @@ public JobState getSubmittedJob() {
}
}

public TaskInfo getStartedTask() {
System.out.println("getStartedTask");
synchronized (this) {
if (taskStateStack.isEmpty()) {
System.out.println("Stack is empty");
try {
System.out.println("wait");
wait();
} catch (InterruptedException ie) {
}
}
return taskStateStack.pop();
}
}

@Override
public void jobStateUpdatedEvent(NotificationData<JobInfo> arg0) {
}
Expand All @@ -375,7 +429,12 @@ public void schedulerStateUpdatedEvent(SchedulerEvent arg0) {
}

@Override
public void taskStateUpdatedEvent(NotificationData<TaskInfo> arg0) {
public void taskStateUpdatedEvent(NotificationData<TaskInfo> data) {
System.out.println("taskStateUpdatedEvent() : " + data);
synchronized (this) {
taskStateStack.push(data.getData());
notifyAll();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,10 @@
*/
package functionaltests;

import functionaltests.jobs.NonTerminatingJob;
import functionaltests.jobs.SimpleJob;
import functionaltests.utils.RestFuncTUtils;
import java.security.Policy;

import javax.ws.rs.core.MediaType;

import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpUriRequest;
Expand All @@ -64,8 +65,9 @@
import org.ow2.proactive.scheduler.common.task.JavaTask;
import org.ow2.proactive.scheduler.common.task.OnTaskError;

import javax.ws.rs.core.MediaType;
import java.security.Policy;
import functionaltests.jobs.NonTerminatingJob;
import functionaltests.jobs.SimpleJob;
import functionaltests.utils.RestFuncTUtils;


public abstract class AbstractRestFuncTestCase {
Expand Down Expand Up @@ -247,6 +249,10 @@ protected Job pendingJob() throws Exception {
return createJob(NonTerminatingJob.class);
}

protected String getTaskNameForClass(Class<?> clazz) {
return clazz.getSimpleName() + "Task";
}

protected Job createJob(Class<?> clazz) throws Exception {
TaskFlowJob job = new TaskFlowJob();
job.setName(clazz.getSimpleName());
Expand All @@ -256,7 +262,7 @@ protected Job createJob(Class<?> clazz) throws Exception {
job.setMaxNumberOfExecution(1);

JavaTask task = new JavaTask();
task.setName(clazz.getSimpleName() + "Task");
task.setName(getTaskNameForClass(clazz));
task.setExecutableClassName(clazz.getName());
task.setMaxNumberOfExecution(1);
task.setOnTaskError(OnTaskError.CANCEL_JOB);
Expand Down