Skip to content

Commit

Permalink
issue JPPF-624 refactored passing of task graph iformation
Browse files Browse the repository at this point in the history
  • Loading branch information
lolocohen committed Mar 25, 2020
1 parent 45fc390 commit 8eb8bad
Show file tree
Hide file tree
Showing 26 changed files with 342 additions and 190 deletions.
30 changes: 13 additions & 17 deletions client/src/java/org/jppf/client/BaseJPPFClientConnection.java
Expand Up @@ -30,6 +30,7 @@
import org.jppf.comm.socket.*;
import org.jppf.io.IOHelper;
import org.jppf.node.protocol.*;
import org.jppf.node.protocol.graph.TaskGraphInfo;
import org.jppf.serialization.*;
import org.jppf.utils.*;
import org.slf4j.*;
Expand Down Expand Up @@ -130,15 +131,13 @@ public List<Task<?>> sendTasks(final ObjectSerializer ser, final ClassLoader cl,
header.setUuid(job.getUuid());
header.setSLA(job.getSLA());
header.setMetadata(job.getMetadata());
final Set<Task<?>> dependencies = clientBundle.getDependencies();
List<Task<?>> deps = null;
if (dependencies != null) {
deps = new ArrayList<>(dependencies);
header.setParameter(BundleParameter.CLIENT_DEPENDENCY_COUNT, deps.size());
final int[] positions = new int[deps.size()];
int i = 0;
for (final Task<?> task: deps) positions[i++] = task.getPosition();
header.setParameter(BundleParameter.CLIENT_DEPENDENCY_POSITIONS, positions);
List<? extends PositionalElement<?>> deps = null;
TaskGraphInfo graphInfo = null;
if ((clientBundle.getClientJob().getTaskGraph() != null) && job.getClientSLA().isGraphTraversalInClient()) graphInfo = clientBundle.getGraphInfo();
if (graphInfo != null) {
deps = graphInfo.getDependencies();
header.setParameter(BundleParameter.JOB_TASK_GRAPH_INFO, graphInfo);
final int[] positions = graphInfo.getDependenciesPositions();
if (debugEnabled) log.debug("sending {} dependencies with positions {}", deps.size(), Arrays.toString(positions));
}
if (debugEnabled) log.debug("found {} dependencies for bundle {}", (deps == null ? 0 : deps.size()), clientBundle);
Expand All @@ -153,9 +152,7 @@ public List<Task<?>> sendTasks(final ObjectSerializer ser, final ClassLoader cl,
IOHelper.sendData(socketClient, null, ser);
}
final List<Task<?>> notSerializableTasks = sendTasks(job, ser, socketClient, tasks);
if (deps != null) {
sendTasks(job, ser, socketClient, deps);
}
if (deps != null) sendTasks(job, ser, socketClient, deps);
socketClient.flush();
return notSerializableTasks;
}
Expand All @@ -169,16 +166,16 @@ public List<Task<?>> sendTasks(final ObjectSerializer ser, final ClassLoader cl,
* @return a list of tasks that couldn't be serialized, possibly empty.
* @throws Exception if an error occurs while sending the request.
*/
private static List<Task<?>> sendTasks(final JPPFJob job, final ObjectSerializer ser, final SocketWrapper socketClient, final Collection<Task<?>> tasks) throws Exception {
private static List<Task<?>> sendTasks(final JPPFJob job, final ObjectSerializer ser, final SocketWrapper socketClient, final List<? extends PositionalElement<?>> tasks) throws Exception {
final List<Task<?>> notSerializableTasks = new ArrayList<>(tasks.size());
for (final Task<?> task : tasks) {
for (final PositionalElement<?> task : tasks) {
try {
IOHelper.sendData(socketClient, task, ser);
} catch(final NotSerializableException e) {
log.error("error serializing task {} for {} : {}", task, job, ExceptionUtils.getStackTrace(e));
task.setThrowable(e);
((Task<?>) task).setThrowable(e);
IOHelper.sendNullData(socketClient);
notSerializableTasks.add(task);
notSerializableTasks.add((Task<?>) task);
}
}
return notSerializableTasks;
Expand All @@ -201,7 +198,6 @@ private List<Task<?>> prepareTasksToSend(final TaskBundle header, final ClientTa
for (final Task<?> task : allTasks) {
final int pos = task.getPosition();
if (!job.getResults().hasResult(pos)) {
//tasks[i] = task;
tasks.add(task);
positions[i] = pos;
maxResubmits[i] = task.getMaxResubmits();
Expand Down
Expand Up @@ -201,8 +201,8 @@ static TaskBundle createBundle(final ClientTaskBundle clientBundle, final long b
bundle.setUuid(clientBundle.getUuid());
bundle.setParameter(BundleParameter.CLIENT_BUNDLE_ID, bundleId);
final ClientJob job = clientBundle.getClientJob();
//if ((job.getTaskGraph() != null) && !job.getClientSLA().isGraphTraversalInClient()) bundle.setParameter(BundleParameter.JOB_TASK_GRAPH, job.getTaskGraph());
if (job.getTaskGraph() != null) bundle.setParameter(BundleParameter.JOB_TASK_GRAPH, job.getTaskGraph());
if ((job.getTaskGraph() != null) && !job.getClientSLA().isGraphTraversalInClient()) bundle.setParameter(BundleParameter.JOB_TASK_GRAPH, job.getTaskGraph());
//if (job.getTaskGraph() != null) bundle.setParameter(BundleParameter.JOB_TASK_GRAPH, job.getTaskGraph());
return bundle;
}

Expand Down
1 change: 0 additions & 1 deletion client/src/java/org/jppf/client/balancer/ClientJob.java
Expand Up @@ -50,7 +50,6 @@ public class ClientJob extends AbstractClientJob {
* The list of the tasks.
*/
final Map<Integer, Task<?>> tasks;
//private final List<Task<?>> tasks;
/**
* The broadcast UUID, i.e. the uuid of the connection the job is broadcast to.
*/
Expand Down
57 changes: 30 additions & 27 deletions client/src/java/org/jppf/client/balancer/ClientTaskBundle.java
Expand Up @@ -21,9 +21,9 @@
import java.util.*;
import java.util.concurrent.atomic.AtomicLong;

import org.jppf.client.JPPFJob;
import org.jppf.client.*;
import org.jppf.node.protocol.*;
import org.jppf.node.protocol.graph.TaskGraph;
import org.jppf.node.protocol.graph.*;
import org.jppf.utils.collections.*;
import org.slf4j.*;

Expand Down Expand Up @@ -63,14 +63,6 @@ public class ClientTaskBundle extends JPPFTaskBundle {
* The tasks to be executed by the node.
*/
private transient List<Task<?>> tasks;
/**
* The dependencies of the tasks.
*/
private transient Set<Task<?>> dependencies;
/**
* A mapping of tasks to their dependencies.
*/
private CollectionMap<Integer, Integer> dependenciesMap;
/**
* The broadcast UUID.
*/
Expand All @@ -83,6 +75,10 @@ public class ClientTaskBundle extends JPPFTaskBundle {
* Job cancel indicator.
*/
private boolean cancelled;
/**
* Information about the task graph, if any, for a job.
*/
private TaskGraphInfo graphInfo;

/**
* Initialize this task bundle and set its build number.
Expand Down Expand Up @@ -258,40 +254,47 @@ public Long getBundleId() {
private void resolveDependencies() {
final TaskGraph graph = job.getTaskGraph();
if (graph == null) return;
if (!job.getJob().getClientSLA().isGraphTraversalInClient()) return;
List<PositionalElement<?>> dependencies = null;
CollectionMap<Integer, Integer> dependenciesMap = null;
final JobResults jobResults = job.getJob().getResults();
final Set<Integer> nullResultPositions = new HashSet<>();
for (final Task<?> task: tasks) {
final TaskGraph.Node node = graph.nodeAt(task.getPosition());
if (node == null) continue;
final List<TaskGraph.Node> deps = node.getDependencies();
if ((deps != null) && !deps.isEmpty()) {
for (final TaskGraph.Node dep: deps) {
if (dependencies == null) {
dependencies = new HashSet<>();
dependencies = new ArrayList<>();
dependenciesMap = new ArrayListHashMap<>();
}
final Task<?> depTask = job.getJob().getResults().getResultTask(dep.getPosition());
if (depTask == null) log.warn("server task null for dependency {} added to {}", depTask, task);
else {
final int depPosition = dep.getPosition();
if (nullResultPositions.contains(depPosition)) continue;
final Task<?> depTask = jobResults.getResultTask(depPosition);
if (depTask == null) {
log.warn("null dependency at position {} added to {}", depPosition, task);
nullResultPositions.add(depPosition);
} else {
dependencies.add(depTask);
dependenciesMap.putValue(task.getPosition(), dep.getPosition());
dependenciesMap.putValue(task.getPosition(), depPosition);
}
}
}
}
if (dependencies != null) {
final int[] depsPositions = new int[dependencies.size()];
int count = 0;
for (PositionalElement<?> elt: dependencies) depsPositions[count++] = elt.getPosition();
this.graphInfo = new TaskGraphInfo(dependencies.size(), dependenciesMap, depsPositions);
this.graphInfo.setDependencies(dependencies);
}
}

/**
* Get a canonical set of direct dependencies for all the tasks in this dispatch bundle.
* @return a {@code Set} of {@link ServerTask} instances.
*/
public Set<Task<?>> getDependencies() {
return dependencies;
}

/**
* Get the mapping of tasks to their dependencies.
* @return a mming of taks positions to the positions of their dependencies.
* @return information about the task graph, if any, for a job.
*/
public CollectionMap<Integer, Integer> getDependenciesMap() {
return dependenciesMap;
public TaskGraphInfo getGraphInfo() {
return graphInfo;
}
}
20 changes: 4 additions & 16 deletions common/src/java/org/jppf/node/protocol/BundleParameter.java
Expand Up @@ -169,23 +169,11 @@ public enum BundleParameter {
*/
JOB_TASK_GRAPH,
/**
* Whether a job graph is already being handled by a driver.
*/
JOB_GRAPH_ALREADY_HANDLED,
/**
* A mapping of tasks positions to the positions of their dependencies.
*/
NODE_DEPENDENCY_MAPPING,
/**
* Number of dependencies in a dispatched task bundle.
*/
NODE_DEPENDENCY_COUNT,
/**
* Number of dependencies in a task bundle dispatched by a client.
* Info on task dependencies within a task bundle.
*/
CLIENT_DEPENDENCY_COUNT,
JOB_TASK_GRAPH_INFO,
/**
* Positions of the dependencies in a task bundle dispatched by a client.
* Whether a job graph is already being handled by a driver.
*/
CLIENT_DEPENDENCY_POSITIONS
JOB_GRAPH_ALREADY_HANDLED
}
43 changes: 43 additions & 0 deletions common/src/java/org/jppf/node/protocol/PositionalElement.java
@@ -0,0 +1,43 @@
/*
* JPPF.
* Copyright (C) 2005-2019 JPPF Team.
* http://www.jppf.org
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.jppf.node.protocol;

/**
* Interface representig an object that cn be accessed by its position in an ordered set.
* @param <T> the type of this element.
* @author Laurent Cohen
*/
public interface PositionalElement<T extends PositionalElement<T>> {
/**
* Returns the position of this element in its container.
* @return the position of this element as an {@code int}.
* @exclude
*/
int getPosition();

/**
* Set the position of this element in its container.
* @param position the position of this task as an {@code int}.
* @return this element, for method call chaining.
* @exclude
*/
default T setPosition(int position) {
return null;
}
}
17 changes: 1 addition & 16 deletions common/src/java/org/jppf/node/protocol/Task.java
Expand Up @@ -29,7 +29,7 @@
* @param <T> the type of results produced by the task.
* @author Laurent Cohen
*/
public interface Task<T> extends Runnable, Serializable, Interruptibility {
public interface Task<T> extends Runnable, Serializable, Interruptibility, PositionalElement<Task<T>> {
/**
* Get the result of the task execution.
* @return the task result.
Expand Down Expand Up @@ -119,21 +119,6 @@ public interface Task<T> extends Runnable, Serializable, Interruptibility {
*/
Task<T> setTimeoutSchedule(JPPFSchedule timeoutSchedule);

/**
* Returns the position of this task in the job in which it was submitted.
* @return the position of this task as an {@code int}.
* @exclude
*/
int getPosition();

/**
* Set the position of this task in the job in which it was submitted.
* @param position the position of this task as an <code>int</code>.
* @return this task, for method chaining.
* @exclude
*/
Task<T> setPosition(int position);

/**
* Determine whether this task is executing within a node, or locally on the client side.
* @return <code>true</code> if this task is executing in a node, <code>false</code> if it is on the client side.
Expand Down
7 changes: 3 additions & 4 deletions common/src/java/org/jppf/node/protocol/graph/TaskGraph.java
Expand Up @@ -21,6 +21,7 @@
import java.io.*;
import java.util.*;

import org.jppf.node.protocol.PositionalElement;
import org.jppf.serialization.SerializationUtils;
import org.jppf.utils.collections.*;

Expand Down Expand Up @@ -337,7 +338,7 @@ public int getDoneCount() {
* A node in the graph of the tasks in a job which represents a task and its dependants.
* @exclude
*/
public static class Node implements Serializable {
public static class Node implements Serializable, PositionalElement<Node> {
/**
* The dependencies of this task, if any.
*/
Expand Down Expand Up @@ -378,9 +379,7 @@ public List<Node> getDependencies() {
return dependencies;
}

/**
* @return the position of this task in the job.
*/
@Override
public int getPosition() {
return position;
}
Expand Down

0 comments on commit 8eb8bad

Please sign in to comment.