Skip to content

Commit

Permalink
Clear Executor stack
Browse files Browse the repository at this point in the history
  • Loading branch information
Theosakamg committed Jun 27, 2017
1 parent 1908f2d commit 078ca01
Show file tree
Hide file tree
Showing 6 changed files with 446 additions and 287 deletions.
2 changes: 2 additions & 0 deletions rcljava/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ if(BUILD_TESTING)
PROPERTY "JAR_FILE")

set(${PROJECT_NAME}_test_sources
"src/test/java/org/ros2/rcljava/ExecutorTest.java"
"src/test/java/org/ros2/rcljava/GraphNameTest.java"
"src/test/java/org/ros2/rcljava/NodeTest.java"
# "src/test/java/org/ros2/rcljava/MessageTest.java"
Expand All @@ -246,6 +247,7 @@ if(BUILD_TESTING)
)

set(${PROJECT_NAME}_testsuites
"org.ros2.rcljava.ExecutorTest"
"org.ros2.rcljava.GraphNameTest"
"org.ros2.rcljava.NodeTest"
# "org.ros2.rcljava.MessageTest"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,19 @@
package org.ros2.rcljava.executor;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ExecutorService;

import org.ros2.rcljava.RCLJava;
import org.ros2.rcljava.node.Node;

public abstract class BaseThreadedExecutor implements ThreadedExecutor {

protected Object mutex = new Object();
protected NativeExecutor baseExecutor;
protected final Object mutex = new Object();
protected final NativeExecutor baseExecutor;
protected volatile ExecutorService executorService;

protected BlockingQueue<Node> nodes = new LinkedBlockingQueue<Node>();
protected final BlockingQueue<Node> nodes = new LinkedBlockingQueue<Node>();

public BaseThreadedExecutor() {
this.baseExecutor = new NativeExecutor(this);
Expand Down Expand Up @@ -69,18 +69,25 @@ public void removeNode(Node node, boolean notify) {
public void spinSome() {
AnyExecutable anyExecutable = this.baseExecutor.getNextExecutable();
while (RCLJava.ok() && anyExecutable != null) {
this.baseExecutor.executeAnyExecutable(anyExecutable);
BaseThreadedExecutor.executeAnyExecutable(anyExecutable);
anyExecutable = this.baseExecutor.getNextExecutable(0);
}

}

@Override
public void spinOnce(long timeout) {
AnyExecutable anyExecutable = this.baseExecutor.getNextExecutable(timeout);
final AnyExecutable anyExecutable = this.baseExecutor.getNextExecutable(timeout);

if (anyExecutable != null) {
this.baseExecutor.executeAnyExecutable(anyExecutable);
BaseThreadedExecutor.executeAnyExecutable(anyExecutable);
}
}

private static void executeAnyExecutable(final AnyExecutable anyExecutable) {
try {
NativeExecutor.executeAnyExecutable(anyExecutable);
} catch (Exception e) {
e.printStackTrace();
}
}

Expand All @@ -101,9 +108,7 @@ public void spinNodeSome(Node node) {
@Override
public void run() {
while (RCLJava.ok()) {
synchronized (mutex) {
this.spinOnce(0);
}
this.spinOnce(0);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package org.ros2.rcljava.executor;

import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
* Pool-multiple-threaded executor implementation
Expand Down Expand Up @@ -48,6 +49,7 @@ public MultiThreadedExecutor(final int numberOfThreads) {
* Pool-multiple threaded implementation of spin.
* This function will block until work comes in, execute it, and keep blocking.
* It will only be interrupt by a CTRL-C (managed by the global signal handler).
* @throws InterruptedException
*/
@Override
public void spin() {
Expand All @@ -59,6 +61,11 @@ public void spin() {

if (!this.executorService.isShutdown()) {
this.executorService.shutdown();
try {
this.executorService.awaitTermination(2, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

Expand Down
20 changes: 10 additions & 10 deletions rcljava/src/main/java/org/ros2/rcljava/executor/NativeExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ public AnyExecutable getNextExecutable() {
* @param timeout
* @return
*/
public AnyExecutable getNextExecutable(long timeout) {
public synchronized AnyExecutable getNextExecutable(long timeout) {
AnyExecutable anyExecutable = this.getNextReadyExecutable();

if (anyExecutable == null) {
Expand All @@ -176,34 +176,34 @@ public AnyExecutable getNextExecutable(long timeout) {
* Execute any type from Memory.
* @param anyExecutable
*/
public void executeAnyExecutable(final AnyExecutable anyExecutable) {
public static void executeAnyExecutable(final AnyExecutable anyExecutable) {

if (anyExecutable == null) {
return;
}

if (anyExecutable.timer != null) {
this.executeTimer(anyExecutable.timer);
NativeExecutor.executeTimer(anyExecutable.timer);
}

if (anyExecutable.subscription != null) {
this.executeSubscription(anyExecutable.subscription);
NativeExecutor.executeSubscription(anyExecutable.subscription);
}

if (anyExecutable.service != null) {
this.executeService(anyExecutable.service);
NativeExecutor.executeService(anyExecutable.service);
}

if (anyExecutable.client != null) {
this.executeClient(anyExecutable.client);
NativeExecutor.executeClient(anyExecutable.client);
}
}

/**
* Execute Timer from Memory.
* @param timer to execute.
*/
private void executeTimer(final WallTimer timer) {
private static void executeTimer(final WallTimer timer) {
timer.callTimer();
timer.getCallback().tick();
}
Expand All @@ -213,7 +213,7 @@ private void executeTimer(final WallTimer timer) {
* @param subscription to execute.
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
private void executeSubscription(final Subscription subscription) {
private static void executeSubscription(final Subscription subscription) {
NativeSubscription<? extends Message> nativeSubscription = (NativeSubscription<?> ) subscription;
Message message = RCLJava.nativeTake(nativeSubscription.getSubscriptionHandle(), nativeSubscription.getMessageType());
if (message != null) {
Expand All @@ -226,7 +226,7 @@ private void executeSubscription(final Subscription subscription) {
* @param service to execute.
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
private void executeService(final Service service) {
private static void executeService(final Service service) {
Class<?> requestType = service.getRequestType();
Class<?> responseType = service.getResponseType();

Expand Down Expand Up @@ -267,7 +267,7 @@ private void executeService(final Service service) {
}

@SuppressWarnings("rawtypes")
private void executeClient(Client client) {
private static void executeClient(Client client) {

}
}
Loading

0 comments on commit 078ca01

Please sign in to comment.