Skip to content

Commit

Permalink
implementing issue JPPF-564 - node side changes
Browse files Browse the repository at this point in the history
  • Loading branch information
lolocohen committed Jan 5, 2019
1 parent 06d2376 commit 101fae9
Show file tree
Hide file tree
Showing 178 changed files with 3,106 additions and 1,747 deletions.
Expand Up @@ -192,7 +192,8 @@ private synchronized void addNode(final TopologyDriver driverData, final Topolog
final DefaultMutableTreeNode nodeNode = TopologyUtils.addNode(model, driverData, nodeData);
if (nodeNode != null) {
final DefaultMutableTreeNode driverNode = (DefaultMutableTreeNode) nodeNode.getParent();
if ((driverNode.getChildCount() == 1) && !treeTable.isCollapsed(driverNode)) treeTable.expand(driverNode);
//if ((driverNode.getChildCount() == 1) && !treeTable.isCollapsed(driverNode)) treeTable.expand(driverNode);
if (driverNode.getChildCount() == 1) treeTable.expand(driverNode);
}
}

Expand Down
Expand Up @@ -77,7 +77,7 @@ public class ChannelWrapperLocal extends ChannelWrapper implements ClientConnect
*/
public ChannelWrapperLocal(final JPPFClient client) {
this.client = client;
executionManager = new ClientExecutionManager(JPPFProperties.LOCAL_EXECUTION_THREADS);
executionManager = new ClientExecutionManager(client.getConfig(), JPPFProperties.LOCAL_EXECUTION_THREADS);
priority = client.getConfig().get(JPPFProperties.LOCAL_EXECUTION_PRIORITY);
systemInfo = new JPPFSystemInformation(client.getConfig(), getConnectionUuid(), true, false);
managementInfo = new JPPFManagementInfo("local", "local", -1, getConnectionUuid(), JPPFManagementInfo.NODE | JPPFManagementInfo.LOCAL, false);
Expand Down
Expand Up @@ -45,10 +45,11 @@ public class ClientExecutionManager extends AbstractExecutionManager {

/**
* Initialize this execution manager.
* @param config the configuration to get the thread manager properties from.
* @param nbThreadsProperty the name of the property which configures the number of threads.
*/
public ClientExecutionManager(final JPPFProperty<Integer> nbThreadsProperty) {
super(nbThreadsProperty);
public ClientExecutionManager(final TypedProperties config, final JPPFProperty<Integer> nbThreadsProperty) {
super(config, nbThreadsProperty);
}

/**
Expand All @@ -58,7 +59,8 @@ public ClientExecutionManager(final JPPFProperty<Integer> nbThreadsProperty) {
*/
@Override
protected void setup(final TaskBundle bundle, final List<Task<?>> taskList) {
taskNotificationDispatcher.setBundle(this.bundle = bundle);
this.bundle = bundle;
//taskNotificationDispatcher.setBundle(bundle);
this.taskList = taskList;
this.taskWrapperList = new ArrayList<>(taskList.size());
this.dataProvider = taskList.get(0).getDataProvider();
Expand All @@ -84,7 +86,7 @@ protected void cleanup() {
this.dataProvider = null;
usedClassLoader.dispose();
usedClassLoader = null;
taskNotificationDispatcher.setBundle(this.bundle = null);
//taskNotificationDispatcher.setBundle(this.bundle = null);
this.taskList = null;
this.uuidList = null;
setJobCancelled(false);
Expand Down
Expand Up @@ -230,7 +230,8 @@ public String toString() {
/**
* @return the id for this bundle.
*/
public long getBundleId() {
@Override
public Long getBundleId() {
return bundleId;
}
}
63 changes: 11 additions & 52 deletions common/src/java/org/jppf/execute/AbstractExecutionManager.java
Expand Up @@ -53,19 +53,19 @@ public abstract class AbstractExecutionManager implements ExecutionManager {
/**
* The bundle whose tasks are currently being executed.
*/
protected TaskBundle bundle = null;
protected TaskBundle bundle;
/**
* The list of tasks to execute.
*/
protected List<Task<?>> taskList = null;
protected List<Task<?>> taskList;
/**
* The uuid path of the current bundle.
*/
protected List<String> uuidList = null;
protected List<String> uuidList;
/**
* Holds a the tasks submitted tot he executor.
* Holds the tasks submitted to the executor.
*/
protected List<NodeTaskWrapper> taskWrapperList = null;
protected List<NodeTaskWrapper> taskWrapperList;
/**
* Dispatches tasks notifications to registered listeners.
*/
Expand All @@ -89,60 +89,24 @@ public abstract class AbstractExecutionManager implements ExecutionManager {
/**
* The class loader used to load the tasks and the classes they need from the client.
*/
protected UsedClassLoader usedClassLoader = null;
protected UsedClassLoader usedClassLoader;
/**
* The data provider for the current job.
*/
protected DataProvider dataProvider = null;
protected DataProvider dataProvider;
/**
* The total accumulated elapsed time of the tasks in the current bundle.
*/
protected final AtomicLong accumulatedElapsed = new AtomicLong(0L);

/**
* Initialize this execution manager with the specified node.
* @param config the configuration to get the thread manager properties from.
* @param nbThreadsProperty the name of the property which configures the number of threads.
*/
public AbstractExecutionManager(final JPPFProperty<Integer> nbThreadsProperty) {
public AbstractExecutionManager(final TypedProperties config, final JPPFProperty<Integer> nbThreadsProperty) {
taskNotificationDispatcher = new TaskExecutionDispatcher(getClass().getClassLoader());
int poolSize = JPPFConfiguration.get(nbThreadsProperty);
if (poolSize <= 0) poolSize = Runtime.getRuntime().availableProcessors();
JPPFConfiguration.set(nbThreadsProperty, poolSize);
log.info("running " + poolSize + " processing thread" + (poolSize > 1 ? "s" : ""));
threadManager = createThreadManager(poolSize);
}

/**
* Create the thread manager instance. Default is {@link ThreadManagerThreadPool}.
* @param poolSize the initial pool size.
* @return an instance of {@link ThreadManager}.
*/
protected static ThreadManager createThreadManager(final int poolSize) {
ThreadManager result = null;
final TypedProperties config = JPPFConfiguration.getProperties();
final String s = config.get(JPPFProperties.THREAD_MANAGER_CLASS);
if (!"default".equalsIgnoreCase(s) && !ThreadManagerThreadPool.class.getName().equals(s) && s != null) {
try {
final Class<?> clazz = Class.forName(s);
final Object instance = ReflectionHelper.invokeConstructor(clazz, new Class[]{Integer.TYPE}, poolSize);
if (instance instanceof ThreadManager) {
result = (ThreadManager) instance;
log.info("Using custom thread manager: " + s);
}
} catch(final Exception e) {
log.error(e.getMessage(), e);
}
}
if (result == null) {
log.info("Using default thread manager");
return new ThreadManagerThreadPool(poolSize);
}
config.set(JPPFProperties.PROCESSING_THREADS, result.getPoolSize());
log.info("Node running " + poolSize + " processing thread" + (poolSize > 1 ? "s" : ""));
final boolean cpuTimeEnabled = result.isCpuTimeEnabled();
config.setBoolean("cpuTimeSupported", cpuTimeEnabled);
log.info("Thread CPU time measurement is " + (cpuTimeEnabled ? "" : "not ") + "supported");
return result;
threadManager = ThreadManager.newInstance(config, nbThreadsProperty);
}

@Override
Expand All @@ -159,7 +123,7 @@ public void execute(final TaskBundle bundle, final List<Task<?>> taskList) throw
for (final Task<?> task : taskList) {
if (!(task instanceof JPPFExceptionResult)) {
if (task instanceof AbstractTask) ((AbstractTask<?>) task).setExecutionDispatcher(taskNotificationDispatcher);
final NodeTaskWrapper taskWrapper = new NodeTaskWrapper(task, usedClassLoader.getClassLoader(), timeoutHandler, threadManager.isCpuTimeEnabled());
final NodeTaskWrapper taskWrapper = new NodeTaskWrapper(task, usedClassLoader.getClassLoader(), timeoutHandler);
taskWrapperList.add(taskWrapper);
ecs.submit(taskWrapper, taskWrapper);
count++;
Expand All @@ -172,11 +136,6 @@ public void execute(final TaskBundle bundle, final List<Task<?>> taskList) throw
final Future<NodeTaskWrapper> future = ecs.take();
if (!future.isCancelled()) {
final NodeTaskWrapper taskWrapper = future.get();
final JPPFReconnectionNotification notif = taskWrapper.getReconnectionNotification();
if (notif != null) {
cancelAllTasks(true, false);
throw notif;
}
taskEnded(taskWrapper);
}
} catch (final Exception e) {
Expand Down
6 changes: 3 additions & 3 deletions common/src/java/org/jppf/execute/ExecutionManager.java
Expand Up @@ -57,7 +57,7 @@ public interface ExecutionManager {

/**
* Get the executor used by this execution manager.
* @return an <code>ExecutorService</code> instance.
* @return an {@code ExecutorService} instance.
*/
ExecutorService getExecutor();

Expand Down Expand Up @@ -93,13 +93,13 @@ public interface ExecutionManager {

/**
* Determine whether the current job has been cancelled, including before starting its execution.
* @return <code>true</code> if the job has been cancelled, <code>false</code> otherwise.
* @return {@code true} if the job has been cancelled, {@code false} otherwise.
*/
boolean isJobCancelled();

/**
* Specify whether the current job has been cancelled, including before starting its execution.
* @param jobCancelled <code>true</code> if the job has been cancelled, <code>false</code> otherwise.
* @param jobCancelled {@code true} if the job has been cancelled, {@code false} otherwise.
*/
void setJobCancelled(boolean jobCancelled);

Expand Down
81 changes: 49 additions & 32 deletions common/src/java/org/jppf/execute/NodeTaskWrapper.java
Expand Up @@ -18,8 +18,9 @@
package org.jppf.execute;

import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;

import org.jppf.JPPFReconnectionNotification;
import org.jppf.execute.async.JobProcessingEntry;
import org.jppf.node.protocol.*;
import org.jppf.scheduling.*;
import org.jppf.utils.ExceptionUtils;
Expand All @@ -37,31 +38,31 @@ public class NodeTaskWrapper implements Runnable {
/**
* Logger for this class.
*/
private static Logger log = LoggerFactory.getLogger(NodeTaskWrapper.class);
private static final Logger log = LoggerFactory.getLogger(NodeTaskWrapper.class);
/**
* Determines whether the debug level is enabled in the log configuration, without the cost of a method call.
*/
private static boolean traceEnabled = log.isTraceEnabled();
private static final boolean traceEnabled = log.isTraceEnabled();
/**
* Timer managing the tasks timeout.
*/
private final JPPFScheduleHandler timeoutHandler;
/**
* Indicator whether task was cancelled;
*/
private boolean cancelled = false;
private boolean cancelled;
/**
* Indicator whether <code>onCancel</code> should be called when cancelled.
*/
private boolean callOnCancel = false;
private boolean callOnCancel;
/**
* Indicator whether task timeout.
*/
private boolean timeout = false;
private boolean timeout;
/**
* Indicator that task was started.
*/
private boolean started = false;
private boolean started;
/**
* The task to execute within a try/catch block.
*/
Expand All @@ -70,15 +71,11 @@ public class NodeTaskWrapper implements Runnable {
/**
* The future created by the executor service.
*/
private Future<?> future = null;
/**
*
*/
private JPPFReconnectionNotification reconnectionNotification = null;
private Future<?> future;
/**
* Holds the used cpu time for this task.
*/
private ExecutionInfo executionInfo = null;
private ExecutionInfo executionInfo;
/**
* The elapsed time for this task's execution.
*/
Expand All @@ -87,18 +84,37 @@ public class NodeTaskWrapper implements Runnable {
* The class loader that was used to load the task class.
*/
private final ClassLoader taskClassLoader;
/**
* Encapsulates information about the job the task is a part of.
*/
private final JobProcessingEntry jobEntry;
/**
* Whether this task has ended.
*/
private final AtomicBoolean ended = new AtomicBoolean(false);

/**
* Initialize this task wrapper with a specified JPPF task.
* @param task the task to execute within a try/catch block.
* @param taskClassLoader the class loader that was used to load the task class.
* @param timeoutHandler handles the timeout for this task.
*/
public NodeTaskWrapper(final Task<?> task, final ClassLoader taskClassLoader, final JPPFScheduleHandler timeoutHandler) {
this(null, task, taskClassLoader, timeoutHandler);
}

/**
* Initialize this task wrapper with a specified JPPF task.
* @param jobEntry encapsulates information about the job the task is a part of.
* @param task the task to execute within a try/catch block.
* @param taskClassLoader the class loader that was used to load the task class.
* @param timeoutHandler handles the timeout for this task.
* @param cpuTimeEnabled whether cpu time is supported/enabled.
*/
public NodeTaskWrapper(final Task<?> task, final ClassLoader taskClassLoader, final JPPFScheduleHandler timeoutHandler, final boolean cpuTimeEnabled) {
public NodeTaskWrapper(final JobProcessingEntry jobEntry, final Task<?> task, final ClassLoader taskClassLoader, final JPPFScheduleHandler timeoutHandler) {
this.task = task;
this.taskClassLoader = taskClassLoader;
this.timeoutHandler = timeoutHandler;
this.jobEntry = jobEntry;
}

/**
Expand Down Expand Up @@ -145,8 +161,7 @@ synchronized void timeout() {
*/
@SuppressWarnings("unchecked")
@Override
public void run()
{
public void run() {
if (traceEnabled) log.trace(toString());
started = true;
final long id = Thread.currentThread().getId();
Expand All @@ -158,8 +173,6 @@ public void run()
Thread.currentThread().setContextClassLoader(taskClassLoader);
executionInfo = CpuTimeCollector.computeExecutionInfo(id);
if (!isCancelledOrTimedout()) task.run();
} catch(final JPPFReconnectionNotification t) {
reconnectionNotification = t;
} catch(final Throwable t) {
task.setThrowable(t);
if (t instanceof UnsatisfiedLinkError) task.setResult(ExceptionUtils.getStackTrace(t));
Expand All @@ -169,20 +182,18 @@ public void run()
try {
elapsedTime = System.nanoTime() - startTime;
if (executionInfo != null) executionInfo = CpuTimeCollector.computeExecutionInfo(id).subtract(executionInfo);
} catch(final JPPFReconnectionNotification t) {
if (reconnectionNotification == null) reconnectionNotification = t;
} catch(@SuppressWarnings("unused") final Throwable ignore) {
} catch(final Throwable e) {
if (traceEnabled) log.trace("error in finally of {}", this, e);
}
try {
silentTimeout();
silentCancel();
} catch(final JPPFReconnectionNotification t) {
if (reconnectionNotification == null) reconnectionNotification = t;
} catch (final Throwable t) {
task.setThrowable(t);
}
if (task.getThrowable() instanceof InterruptedException) task.setThrowable(null);
cancelTimeoutAction();
taskEnded();
}
}

Expand Down Expand Up @@ -264,14 +275,6 @@ public void setFuture(final Future<?> future) {
this.future = future;
}

/**
* Get the reconnection notification thrown by the atysk execution, if any.
* @return a {@link JPPFReconnectionNotification} or <code>null</code>.
*/
JPPFReconnectionNotification getReconnectionNotification() {
return reconnectionNotification;
}

/**
* Remove the specified future from the pending set and notify
* all threads waiting for the end of the execution.
Expand All @@ -295,4 +298,18 @@ public ExecutionInfo getExecutionInfo() {
public long getElapsedTime() {
return elapsedTime;
}

/**
* @return the object which encapsulates information about the job the task is a part of.
*/
public JobProcessingEntry getJobEntry() {
return jobEntry;
}

/**
* called when a task terminates.
*/
public void taskEnded() {
if (ended.compareAndSet(false, true)) jobEntry.executionManager.taskEnded(this);
}
}

0 comments on commit 101fae9

Please sign in to comment.