Skip to content

Commit

Permalink
Persistent Tasks: PersistentTaskRequest -> PersistTaskParams (elastic…
Browse files Browse the repository at this point in the history
…#1057)

Removes the last pieces of ActionRequest from PersistentTaskRequest and renames it into PersistTaskParams, which is now just an interface that extends NamedWriteable and ToXContent.
  • Loading branch information
imotov authored and martijnvg committed Jan 31, 2018
1 parent 6bfea09 commit abd9ae3
Show file tree
Hide file tree
Showing 17 changed files with 300 additions and 251 deletions.
Expand Up @@ -18,6 +18,7 @@
*/
package org.elasticsearch.persistent;

import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.threadpool.ThreadPool;

Expand All @@ -33,10 +34,10 @@ public NodePersistentTasksExecutor(ThreadPool threadPool) {
this.threadPool = threadPool;
}

public <Request extends PersistentTaskRequest> void executeTask(Request request,
AllocatedPersistentTask task,
PersistentTasksExecutor<Request> action) {
threadPool.executor(action.getExecutor()).execute(new AbstractRunnable() {
public <Params extends PersistentTaskParams> void executeTask(@Nullable Params params,
AllocatedPersistentTask task,
PersistentTasksExecutor<Params> executor) {
threadPool.executor(executor.getExecutor()).execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
task.markAsFailed(e);
Expand All @@ -46,7 +47,7 @@ public void onFailure(Exception e) {
@Override
protected void doRun() throws Exception {
try {
action.nodeOperation(task, request);
executor.nodeOperation(task, params);
} catch (Exception ex) {
task.markAsFailed(ex);
}
Expand Down
Expand Up @@ -16,20 +16,15 @@
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.persistent;

import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.common.io.stream.NamedWriteable;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;

/**
* Base class for a request for a persistent task
* Parameters used to start persistent task
*/
public abstract class PersistentTaskRequest extends ActionRequest implements NamedWriteable, ToXContent {
@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId) {
return new AllocatedPersistentTask(id, type, action, getDescription(), parentTaskId);
}
public interface PersistentTaskParams extends NamedWriteable, ToXContent {

}
Expand Up @@ -30,6 +30,7 @@
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.tasks.Task;
Expand Down Expand Up @@ -58,22 +59,22 @@ public PersistentTasksClusterService(Settings settings, PersistentTasksExecutorR
* Creates a new persistent task on master node
*
* @param action the action name
* @param request request
* @param params params
* @param listener the listener that will be called when task is started
*/
public <Request extends PersistentTaskRequest> void createPersistentTask(String taskId, String action, Request request,
ActionListener<PersistentTask<?>> listener) {
public <Params extends PersistentTaskParams> void createPersistentTask(String taskId, String action, @Nullable Params params,
ActionListener<PersistentTask<?>> listener) {
clusterService.submitStateUpdateTask("create persistent task", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
PersistentTasksCustomMetaData.Builder builder = builder(currentState);
if (builder.hasTask(taskId)) {
throw new ResourceAlreadyExistsException("task with id {" + taskId + "} already exist");
}
validate(action, clusterService.state(), request);
validate(action, clusterService.state(), params);
final Assignment assignment;
assignment = getAssignement(action, currentState, request);
return update(currentState, builder.addTask(taskId, action, request, assignment));
assignment = getAssignement(action, currentState, params);
return update(currentState, builder.addTask(taskId, action, params, assignment));
}

@Override
Expand Down Expand Up @@ -205,14 +206,15 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
});
}

private <Request extends PersistentTaskRequest> Assignment getAssignement(String taskName, ClusterState currentState, Request request) {
PersistentTasksExecutor<Request> persistentTasksExecutor = registry.getPersistentTaskExecutorSafe(taskName);
return persistentTasksExecutor.getAssignment(request, currentState);
private <Params extends PersistentTaskParams> Assignment getAssignement(String taskName, ClusterState currentState,
@Nullable Params params) {
PersistentTasksExecutor<Params> persistentTasksExecutor = registry.getPersistentTaskExecutorSafe(taskName);
return persistentTasksExecutor.getAssignment(params, currentState);
}

private <Request extends PersistentTaskRequest> void validate(String taskName, ClusterState currentState, Request request) {
PersistentTasksExecutor<Request> persistentTasksExecutor = registry.getPersistentTaskExecutorSafe(taskName);
persistentTasksExecutor.validate(request, currentState);
private <Params extends PersistentTaskParams> void validate(String taskName, ClusterState currentState, @Nullable Params params) {
PersistentTasksExecutor<Params> persistentTasksExecutor = registry.getPersistentTaskExecutorSafe(taskName);
persistentTasksExecutor.validate(params, currentState);
}

@Override
Expand All @@ -229,7 +231,7 @@ public void clusterChanged(ClusterChangedEvent event) {
}

interface ExecutorNodeDecider {
<Request extends PersistentTaskRequest> Assignment getAssignment(String action, ClusterState currentState, Request request);
<Params extends PersistentTaskParams> Assignment getAssignment(String action, ClusterState currentState, Params params);
}

static boolean reassignmentRequired(ClusterChangedEvent event, ExecutorNodeDecider decider) {
Expand All @@ -245,7 +247,7 @@ static boolean reassignmentRequired(ClusterChangedEvent event, ExecutorNodeDecid
if (taskInProgress.needsReassignment(event.state().nodes())) {
// there is an unassigned task or task with a disappeared node - we need to try assigning it
if (Objects.equals(taskInProgress.getAssignment(),
decider.getAssignment(taskInProgress.getTaskName(), event.state(), taskInProgress.getRequest())) == false) {
decider.getAssignment(taskInProgress.getTaskName(), event.state(), taskInProgress.getParams())) == false) {
// it looks like a assignment for at least one task is possible - let's trigger reassignment
reassignmentRequired = true;
break;
Expand Down Expand Up @@ -290,7 +292,7 @@ static ClusterState reassignTasks(ClusterState currentState, Logger logger, Exec
for (PersistentTask<?> task : tasks.tasks()) {
if (task.needsReassignment(nodes)) {
// there is an unassigned task - we need to try assigning it
Assignment assignment = decider.getAssignment(task.getTaskName(), clusterState, task.getRequest());
Assignment assignment = decider.getAssignment(task.getTaskName(), clusterState, task.getParams());
if (Objects.equals(assignment, task.getAssignment()) == false) {
logger.trace("reassigning task {} from node {} to node {}", task.getId(),
task.getAssignment().getExecutorNode(), assignment.getExecutorNode());
Expand Down

0 comments on commit abd9ae3

Please sign in to comment.