Skip to content

Commit

Permalink
Add Executor stack
Browse files Browse the repository at this point in the history
  • Loading branch information
Theosakamg committed Jun 26, 2017
1 parent 61c685d commit 1908f2d
Show file tree
Hide file tree
Showing 14 changed files with 1,201 additions and 133 deletions.
8 changes: 8 additions & 0 deletions rcljava/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,14 @@ set(${PROJECT_NAME}_sources
"src/main/java/org/ros2/rcljava/node/NativeNode.java"
"src/main/java/org/ros2/rcljava/node/Node.java"

"src/main/java/org/ros2/rcljava/executor/AnyExecutable.java"
"src/main/java/org/ros2/rcljava/executor/BaseThreadedExecutor.java"
"src/main/java/org/ros2/rcljava/executor/MemoryStrategy.java"
"src/main/java/org/ros2/rcljava/executor/MultiThreadedExecutor.java"
"src/main/java/org/ros2/rcljava/executor/NativeExecutor.java"
"src/main/java/org/ros2/rcljava/executor/SingleThreadedExecutor.java"
"src/main/java/org/ros2/rcljava/executor/ThreadedExecutor.java"

"src/main/java/org/ros2/rcljava/qos/policies/Durability.java"
"src/main/java/org/ros2/rcljava/qos/policies/History.java"
"src/main/java/org/ros2/rcljava/qos/policies/QoSPolicy.java"
Expand Down
47 changes: 24 additions & 23 deletions rcljava/src/main/java/org/ros2/rcljava/RCLJava.java
Original file line number Diff line number Diff line change
Expand Up @@ -115,37 +115,37 @@ public abstract class RCLJava {
private static native long nativeCreateNodeHandle(String nodeName, String spaceName);

// Wait.h
private static native long nativeGetZeroInitializedWaitSet();
private static native void nativeWaitSetInit(
public static native long nativeGetZeroInitializedWaitSet();
public static native void nativeWaitSetInit(
long waitSetHandle,
int numberOfSubscriptions,
int numberOfGuardConditions,
int numberOfTimers,
int numberOfClients,
int numberOfServices);
private static native void nativeWaitSetClearSubscriptions(long waitSetHandle);
private static native void nativeWaitSetAddSubscription(long waitSetHandle, long subscriptionHandle);
private static native void nativeWaitSetClearServices(long waitSetHandle);
private static native void nativeWaitSetAddService(long waitSetHandle, long serviceHandle);
private static native void nativeWaitSetClearTimers(long waitSetHandle);
private static native void nativeWaitSetAddTimer(long waitSetHandle, long timerHandle);
private static native void nativeWaitSetClearClients(long waitSetHandle);
private static native void nativeWaitSetAddClient(long waitSetHandle, long clientHandle);
private static native void nativeWait(long waitSetHandle);
private static native Message nativeTake(long SubscriptionHandle, Class<?> msgType);
private static native void nativeWaitSetFini(long waitSetHandle);
private static native Object nativeTakeRequest(
public static native void nativeWaitSetClearSubscriptions(long waitSetHandle);
public static native void nativeWaitSetAddSubscription(long waitSetHandle, long subscriptionHandle);
public static native void nativeWaitSetClearServices(long waitSetHandle);
public static native void nativeWaitSetAddService(long waitSetHandle, long serviceHandle);
public static native void nativeWaitSetClearTimers(long waitSetHandle);
public static native void nativeWaitSetAddTimer(long waitSetHandle, long timerHandle);
public static native void nativeWaitSetClearClients(long waitSetHandle);
public static native void nativeWaitSetAddClient(long waitSetHandle, long clientHandle);
public static native void nativeWait(long waitSetHandle);
public static native Message nativeTake(long SubscriptionHandle, Class<?> msgType);
public static native void nativeWaitSetFini(long waitSetHandle);
public static native Object nativeTakeRequest(
long serviceHandle,
long requestFromJavaConverterHandle,
long requestToJavaConverterHandle,
Object requestMessage);
private static native void nativeSendServiceResponse(
public static native void nativeSendServiceResponse(
long serviceHandle,
Object header,
long responseFromJavaConverterHandle,
long responseToJavaConverterHandle,
Object responseMessage);
private static native Object nativeTakeResponse(
public static native Object nativeTakeResponse(
long clientHandle,
long responseFromJavaConverterHandle,
long responseToJavaConverterHandle,
Expand Down Expand Up @@ -331,13 +331,14 @@ public static void spinOnce(final Node node) {
node.getPublishers().size() > 0 ||
node.getServices().size() > 0 ||
node.getSubscriptions().size() > 0) {

long waitSetHandle = RCLJava.nativeGetZeroInitializedWaitSet();

RCLJava.nativeWaitSetInit(
waitSetHandle,
node.getSubscriptions().size(),
0,
node.getTimers().size(),
node.getWallTimers().size(),
node.getClients().size(),
node.getServices().size());

Expand All @@ -353,8 +354,8 @@ public static void spinOnce(final Node node) {
RCLJava.nativeWaitSetAddSubscription(waitSetHandle, nativeSubscription.getSubscriptionHandle());
}

for (WallTimer timer : node.getTimers()) {
nativeWaitSetAddTimer(waitSetHandle, timer.getHandle());
for (WallTimer timer : node.getWallTimers()) {
RCLJava.nativeWaitSetAddTimer(waitSetHandle, timer.getHandle());
}

for (Service<?> service : node.getServices()) {
Expand All @@ -381,18 +382,18 @@ public static void spinOnce(final Node node) {
}
}

for (WallTimer timer : node.getTimers()) {
for (WallTimer timer : node.getWallTimers()) {
if (timer.isReady()) {
timer.callTimer();
timer.getCallback().tick();
}
}

for (Service service : node.getServices()) {
long requestFromJavaConverterHandle = service.getRequestFromJavaConverterHandle();
long requestToJavaConverterHandle = service.getRequestToJavaConverterHandle();
long requestFromJavaConverterHandle = service.getRequestFromJavaConverterHandle();
long requestToJavaConverterHandle = service.getRequestToJavaConverterHandle();
long responseFromJavaConverterHandle = service.getResponseFromJavaConverterHandle();
long responseToJavaConverterHandle = service.getResponseToJavaConverterHandle();
long responseToJavaConverterHandle = service.getResponseToJavaConverterHandle();

Class<?> requestType = service.getRequestType();
Class<?> responseType = service.getResponseType();
Expand Down
34 changes: 34 additions & 0 deletions rcljava/src/main/java/org/ros2/rcljava/executor/AnyExecutable.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/* Copyright 2017 Mickael Gaillard <mick.gaillard@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.ros2.rcljava.executor;

import org.ros2.rcljava.node.Node;
import org.ros2.rcljava.node.service.Client;
import org.ros2.rcljava.node.service.Service;
import org.ros2.rcljava.node.topic.Subscription;
import org.ros2.rcljava.time.WallTimer;

public class AnyExecutable {

// Only one of the following pointers will be set.
protected Subscription<?> subscription = null;
protected WallTimer timer = null;
protected Service<?> service = null;
protected Client<?> client = null;

// These are used to keep the scope on the containing items
protected Node node = null;

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/* Copyright 2017 Mickael Gaillard <mick.gaillard@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.ros2.rcljava.executor;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;

import org.ros2.rcljava.RCLJava;
import org.ros2.rcljava.node.Node;

public abstract class BaseThreadedExecutor implements ThreadedExecutor {

protected Object mutex = new Object();
protected NativeExecutor baseExecutor;
protected volatile ExecutorService executorService;

protected BlockingQueue<Node> nodes = new LinkedBlockingQueue<Node>();

public BaseThreadedExecutor() {
this.baseExecutor = new NativeExecutor(this);
}

@Override
public void addNode(Node node) {
this.addNode(node, false);
}

@Override
public void addNode(Node node, boolean notify) {
if (!this.nodes.contains(node)) {
this.nodes.add(node);

if (notify) {

}
}
}

@Override
public void removeNode(Node node) {
this.removeNode(node, false);
}

@Override
public void removeNode(Node node, boolean notify) {
if (this.nodes.contains(node)) {
this.nodes.remove(node);

if (notify) {

}
}
}

@Override
public void spinSome() {
AnyExecutable anyExecutable = this.baseExecutor.getNextExecutable();
while (RCLJava.ok() && anyExecutable != null) {
this.baseExecutor.executeAnyExecutable(anyExecutable);
anyExecutable = this.baseExecutor.getNextExecutable(0);
}

}

@Override
public void spinOnce(long timeout) {
AnyExecutable anyExecutable = this.baseExecutor.getNextExecutable(timeout);

if (anyExecutable != null) {
this.baseExecutor.executeAnyExecutable(anyExecutable);
}
}

@Override
public void spinNodeOnce(Node node, long timeout) {
this.addNode(node, false);
this.spinOnce(timeout);
this.removeNode(node, false);
}

@Override
public void spinNodeSome(Node node) {
this.addNode(node, false);
this.spinSome();
this.removeNode(node, false);
}

@Override
public void run() {
while (RCLJava.ok()) {
synchronized (mutex) {
this.spinOnce(0);
}
}
}

@Override
public void cancel() {
if (!this.executorService.isShutdown()) {
this.executorService.shutdownNow();
}
}

}
115 changes: 115 additions & 0 deletions rcljava/src/main/java/org/ros2/rcljava/executor/MemoryStrategy.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/* Copyright 2017 Mickael Gaillard <mick.gaillard@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.ros2.rcljava.executor;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;

import org.ros2.rcljava.node.Node;
import org.ros2.rcljava.node.service.Client;
import org.ros2.rcljava.node.service.Service;
import org.ros2.rcljava.node.topic.Subscription;
import org.ros2.rcljava.time.WallTimer;

public class MemoryStrategy {

public ConcurrentLinkedQueue<Subscription<?>> subscriptionHandles = new ConcurrentLinkedQueue<Subscription<?>>();
public ConcurrentLinkedQueue<WallTimer> timerHandles = new ConcurrentLinkedQueue<WallTimer>();
public ConcurrentLinkedQueue<Service<?>> serviceHandles = new ConcurrentLinkedQueue<Service<?>>();
public ConcurrentLinkedQueue<Client<?>> clientHandles = new ConcurrentLinkedQueue<Client<?>>();

public MemoryStrategy() {

}

public void clearHandles() {
this.subscriptionHandles.clear();
this.timerHandles.clear();
this.serviceHandles.clear();
this.clientHandles.clear();
}


public boolean collectEntities(final BlockingQueue<Node> nodes) {
for (Node node : nodes) {
for (Subscription<?> subscription : node.getSubscriptions()) {
this.subscriptionHandles.add(subscription);
}

for (Service<?> service : node.getServices()) {
this.serviceHandles.add(service);
}

for (Client<?> client : node.getClients()) {
this.clientHandles.add(client);
}

for (WallTimer timer : node.getWallTimers()) {
this.timerHandles.add(timer);
}
}

return false;
}

/**
* Provide a newly initialized AnyExecutable object.
* @return fresh executable.
*/
public AnyExecutable instantiateNextExecutable() {
return new AnyExecutable();
}

public void getNextSubscription(AnyExecutable anyExecutable, BlockingQueue<Node> nodes) {
Subscription<?> subscription = this.subscriptionHandles.poll();
if (subscription != null) {
anyExecutable.subscription = subscription;
}
}

public void getNextService(AnyExecutable anyExecutable, BlockingQueue<Node> nodes) {
Service<?> service = this.serviceHandles.poll();
if (service != null) {
anyExecutable.service = service;
}
}

public void getNextClient(AnyExecutable anyExecutable, BlockingQueue<Node> nodes) {
Client<?> client = this.clientHandles.poll();
if (client != null) {
anyExecutable.client = client;
}
}

public int numberOfReadySubscriptions() {
return this.subscriptionHandles.size();
}

public int numberOfReadyServices() {
return this.serviceHandles.size();
}

public int numberOfReadyClients() {
return this.clientHandles.size();
}

public int numberOfReadyTimers() {
return this.timerHandles.size();
}
// public int number_of_guard_conditions() {
// return -1;
// }

}
Loading

0 comments on commit 1908f2d

Please sign in to comment.