From 078ca012f28ba93e8cee9e2c05bd267ee1afdb3f Mon Sep 17 00:00:00 2001 From: Mickael Gaillard Date: Tue, 27 Jun 2017 09:32:52 +0200 Subject: [PATCH] Clear Executor stack --- rcljava/CMakeLists.txt | 2 + .../executor/BaseThreadedExecutor.java | 27 +- .../executor/MultiThreadedExecutor.java | 7 + .../ros2/rcljava/executor/NativeExecutor.java | 20 +- .../java/org/ros2/rcljava/ExecutorTest.java | 411 ++++++++++++++++++ .../test/java/org/ros2/rcljava/NodeTest.java | 266 ------------ 6 files changed, 446 insertions(+), 287 deletions(-) create mode 100644 rcljava/src/test/java/org/ros2/rcljava/ExecutorTest.java diff --git a/rcljava/CMakeLists.txt b/rcljava/CMakeLists.txt index c30fa4d..b517761 100644 --- a/rcljava/CMakeLists.txt +++ b/rcljava/CMakeLists.txt @@ -235,6 +235,7 @@ if(BUILD_TESTING) PROPERTY "JAR_FILE") set(${PROJECT_NAME}_test_sources + "src/test/java/org/ros2/rcljava/ExecutorTest.java" "src/test/java/org/ros2/rcljava/GraphNameTest.java" "src/test/java/org/ros2/rcljava/NodeTest.java" # "src/test/java/org/ros2/rcljava/MessageTest.java" @@ -246,6 +247,7 @@ if(BUILD_TESTING) ) set(${PROJECT_NAME}_testsuites + "org.ros2.rcljava.ExecutorTest" "org.ros2.rcljava.GraphNameTest" "org.ros2.rcljava.NodeTest" # "org.ros2.rcljava.MessageTest" diff --git a/rcljava/src/main/java/org/ros2/rcljava/executor/BaseThreadedExecutor.java b/rcljava/src/main/java/org/ros2/rcljava/executor/BaseThreadedExecutor.java index 7bb6974..1291436 100644 --- a/rcljava/src/main/java/org/ros2/rcljava/executor/BaseThreadedExecutor.java +++ b/rcljava/src/main/java/org/ros2/rcljava/executor/BaseThreadedExecutor.java @@ -15,19 +15,19 @@ package org.ros2.rcljava.executor; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ExecutorService; import org.ros2.rcljava.RCLJava; import org.ros2.rcljava.node.Node; public abstract class BaseThreadedExecutor implements ThreadedExecutor { - protected Object mutex = new Object(); - protected NativeExecutor baseExecutor; + protected final Object mutex = new Object(); + protected final NativeExecutor baseExecutor; protected volatile ExecutorService executorService; - protected BlockingQueue nodes = new LinkedBlockingQueue(); + protected final BlockingQueue nodes = new LinkedBlockingQueue(); public BaseThreadedExecutor() { this.baseExecutor = new NativeExecutor(this); @@ -69,18 +69,25 @@ public void removeNode(Node node, boolean notify) { public void spinSome() { AnyExecutable anyExecutable = this.baseExecutor.getNextExecutable(); while (RCLJava.ok() && anyExecutable != null) { - this.baseExecutor.executeAnyExecutable(anyExecutable); + BaseThreadedExecutor.executeAnyExecutable(anyExecutable); anyExecutable = this.baseExecutor.getNextExecutable(0); } - } @Override public void spinOnce(long timeout) { - AnyExecutable anyExecutable = this.baseExecutor.getNextExecutable(timeout); + final AnyExecutable anyExecutable = this.baseExecutor.getNextExecutable(timeout); if (anyExecutable != null) { - this.baseExecutor.executeAnyExecutable(anyExecutable); + BaseThreadedExecutor.executeAnyExecutable(anyExecutable); + } + } + + private static void executeAnyExecutable(final AnyExecutable anyExecutable) { + try { + NativeExecutor.executeAnyExecutable(anyExecutable); + } catch (Exception e) { + e.printStackTrace(); } } @@ -101,9 +108,7 @@ public void spinNodeSome(Node node) { @Override public void run() { while (RCLJava.ok()) { - synchronized (mutex) { - this.spinOnce(0); - } + this.spinOnce(0); } } diff --git a/rcljava/src/main/java/org/ros2/rcljava/executor/MultiThreadedExecutor.java b/rcljava/src/main/java/org/ros2/rcljava/executor/MultiThreadedExecutor.java index 7e72a33..ea999c6 100644 --- a/rcljava/src/main/java/org/ros2/rcljava/executor/MultiThreadedExecutor.java +++ b/rcljava/src/main/java/org/ros2/rcljava/executor/MultiThreadedExecutor.java @@ -15,6 +15,7 @@ package org.ros2.rcljava.executor; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; /** * Pool-multiple-threaded executor implementation @@ -48,6 +49,7 @@ public MultiThreadedExecutor(final int numberOfThreads) { * Pool-multiple threaded implementation of spin. * This function will block until work comes in, execute it, and keep blocking. * It will only be interrupt by a CTRL-C (managed by the global signal handler). + * @throws InterruptedException */ @Override public void spin() { @@ -59,6 +61,11 @@ public void spin() { if (!this.executorService.isShutdown()) { this.executorService.shutdown(); + try { + this.executorService.awaitTermination(2, TimeUnit.SECONDS); + } catch (InterruptedException e) { + e.printStackTrace(); + } } } diff --git a/rcljava/src/main/java/org/ros2/rcljava/executor/NativeExecutor.java b/rcljava/src/main/java/org/ros2/rcljava/executor/NativeExecutor.java index 735fe08..098ca33 100644 --- a/rcljava/src/main/java/org/ros2/rcljava/executor/NativeExecutor.java +++ b/rcljava/src/main/java/org/ros2/rcljava/executor/NativeExecutor.java @@ -161,7 +161,7 @@ public AnyExecutable getNextExecutable() { * @param timeout * @return */ - public AnyExecutable getNextExecutable(long timeout) { + public synchronized AnyExecutable getNextExecutable(long timeout) { AnyExecutable anyExecutable = this.getNextReadyExecutable(); if (anyExecutable == null) { @@ -176,26 +176,26 @@ public AnyExecutable getNextExecutable(long timeout) { * Execute any type from Memory. * @param anyExecutable */ - public void executeAnyExecutable(final AnyExecutable anyExecutable) { + public static void executeAnyExecutable(final AnyExecutable anyExecutable) { if (anyExecutable == null) { return; } if (anyExecutable.timer != null) { - this.executeTimer(anyExecutable.timer); + NativeExecutor.executeTimer(anyExecutable.timer); } if (anyExecutable.subscription != null) { - this.executeSubscription(anyExecutable.subscription); + NativeExecutor.executeSubscription(anyExecutable.subscription); } if (anyExecutable.service != null) { - this.executeService(anyExecutable.service); + NativeExecutor.executeService(anyExecutable.service); } if (anyExecutable.client != null) { - this.executeClient(anyExecutable.client); + NativeExecutor.executeClient(anyExecutable.client); } } @@ -203,7 +203,7 @@ public void executeAnyExecutable(final AnyExecutable anyExecutable) { * Execute Timer from Memory. * @param timer to execute. */ - private void executeTimer(final WallTimer timer) { + private static void executeTimer(final WallTimer timer) { timer.callTimer(); timer.getCallback().tick(); } @@ -213,7 +213,7 @@ private void executeTimer(final WallTimer timer) { * @param subscription to execute. */ @SuppressWarnings({ "unchecked", "rawtypes" }) - private void executeSubscription(final Subscription subscription) { + private static void executeSubscription(final Subscription subscription) { NativeSubscription nativeSubscription = (NativeSubscription ) subscription; Message message = RCLJava.nativeTake(nativeSubscription.getSubscriptionHandle(), nativeSubscription.getMessageType()); if (message != null) { @@ -226,7 +226,7 @@ private void executeSubscription(final Subscription subscription) { * @param service to execute. */ @SuppressWarnings({ "unchecked", "rawtypes" }) - private void executeService(final Service service) { + private static void executeService(final Service service) { Class requestType = service.getRequestType(); Class responseType = service.getResponseType(); @@ -267,7 +267,7 @@ private void executeService(final Service service) { } @SuppressWarnings("rawtypes") - private void executeClient(Client client) { + private static void executeClient(Client client) { } } diff --git a/rcljava/src/test/java/org/ros2/rcljava/ExecutorTest.java b/rcljava/src/test/java/org/ros2/rcljava/ExecutorTest.java new file mode 100644 index 0000000..9878ab2 --- /dev/null +++ b/rcljava/src/test/java/org/ros2/rcljava/ExecutorTest.java @@ -0,0 +1,411 @@ +/* Copyright 2017 Mickael Gaillard + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.ros2.rcljava; + +import static org.junit.Assert.*; + +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.ros2.rcljava.executor.MultiThreadedExecutor; +import org.ros2.rcljava.executor.SingleThreadedExecutor; +import org.ros2.rcljava.executor.ThreadedExecutor; + +import org.ros2.rcljava.node.Node; +import org.ros2.rcljava.node.service.RCLFuture; +import org.ros2.rcljava.node.topic.NativePublisher; +import org.ros2.rcljava.node.topic.NativeSubscription; +import org.ros2.rcljava.node.topic.SubscriptionCallback; + +import org.ros2.rcljava.internal.message.Message; +import std_msgs.msg.UInt32; + +public class ExecutorTest { + + public class TestConsumer implements SubscriptionCallback { + private final RCLFuture future; + + TestConsumer(final RCLFuture future) { + this.future = future; + } + + public final void dispatch(final T msg) { + if (!this.future.isDone()) { + this.future.set(msg); + } + } + } + + public class TestConsumerFail implements SubscriptionCallback { + private final RCLFuture future; + + TestConsumerFail(final RCLFuture future) { + this.future = future; + } + + public final void dispatch(final T msg) { + if (!this.future.isDone()) { + this.future.set(msg); + throw new NullPointerException(); + } + } + } + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + } + + @Before + public void setUp() throws Exception { + } + + @After + public void tearDown() throws Exception { + } + + + @Test + public final void testLinkMultipleProcess() throws Exception { + boolean test = true; + + try { + RCLJava.rclJavaInit(); + ThreadedExecutor executor = new MultiThreadedExecutor(); + + final Node publisherNode = RCLJava.createNode("publisher_node"); + final Node subscriptionNodeOne = RCLJava.createNode("subscription_node_one"); + final Node subscriptionNodeTwo = RCLJava.createNode("subscription_node_two"); + + NativePublisher publisher = (NativePublisher) publisherNode.createPublisher( + UInt32.class, "test_topic_multiple"); + + final RCLFuture futureOne = new RCLFuture(executor); + + NativeSubscription subscriptionOne = (NativeSubscription) subscriptionNodeOne.createSubscription( + UInt32.class, "test_topic_multiple", new TestConsumer(futureOne)); + + final RCLFuture futureTwo = new RCLFuture(executor); + + NativeSubscription subscriptionTwo = (NativeSubscription) subscriptionNodeTwo.createSubscription( + UInt32.class, "test_topic_multiple", new TestConsumer(futureTwo)); + + UInt32 msg = new UInt32(); + msg.setData(54321); + + executor.addNode(publisherNode); + executor.addNode(subscriptionNodeOne); + executor.addNode(subscriptionNodeTwo); + +// executor.spin(); + + while (RCLJava.ok() && !(futureOne.isDone() && futureTwo.isDone())) { + publisher.publish(msg); + executor.spinOnce(0); + } + + UInt32 valueOne = futureOne.get(); + assertEquals(54321, valueOne.getData()); + + UInt32 valueTwo = futureTwo.get(); + assertEquals(54321, valueTwo.getData()); + + executor.removeNode(subscriptionNodeTwo); + executor.removeNode(subscriptionNodeOne); + executor.removeNode(publisherNode); + executor.cancel(); + + publisher.dispose(); + subscriptionOne.dispose(); + subscriptionTwo.dispose(); + + publisherNode.dispose(); + subscriptionNodeOne.dispose(); + subscriptionNodeTwo.dispose(); + } catch (Exception e) { + test = false; + } finally { + RCLJava.shutdown(); + } + + Assert.assertTrue("Expected Runtime error.", test); + } + + @Test + public final void testSeparateMultipleProcess() throws Exception { + boolean test = true; + + try { + RCLJava.rclJavaInit(); + ThreadedExecutor executor = new MultiThreadedExecutor(); + + final Node publisherNode = RCLJava.createNode("publisher_node"); + final Node subscriptionNodeOne = RCLJava.createNode("subscription_node_one"); + final Node subscriptionNodeTwo = RCLJava.createNode("subscription_node_two"); + + NativePublisher publisher = (NativePublisher) publisherNode.createPublisher( + UInt32.class, "test_topic_multiple"); + + final RCLFuture futureOne = new RCLFuture(executor); + + NativeSubscription subscriptionOne = (NativeSubscription) subscriptionNodeOne.createSubscription( + UInt32.class, "test_topic_multiple", new TestConsumer(futureOne)); + + final RCLFuture futureTwo = new RCLFuture(executor); + + NativeSubscription subscriptionTwo = (NativeSubscription) subscriptionNodeTwo.createSubscription( + UInt32.class, "test_topic_multiple", new TestConsumer(futureTwo)); + + UInt32 msg = new UInt32(); + msg.setData(54321); + + executor.addNode(publisherNode); + executor.addNode(subscriptionNodeOne); + executor.addNode(subscriptionNodeTwo); + + executor.spin(); + + while (RCLJava.ok() && !(futureOne.isDone() && futureTwo.isDone())) { + publisher.publish(msg); + } + + UInt32 valueOne = futureOne.get(); + assertEquals(54321, valueOne.getData()); + + UInt32 valueTwo = futureTwo.get(); + assertEquals(54321, valueTwo.getData()); + + executor.removeNode(subscriptionNodeTwo); + executor.removeNode(subscriptionNodeOne); + executor.removeNode(publisherNode); + executor.cancel(); + + publisher.dispose(); + subscriptionOne.dispose(); + subscriptionTwo.dispose(); + + publisherNode.dispose(); + subscriptionNodeOne.dispose(); + subscriptionNodeTwo.dispose(); + } catch (Exception e) { + test = false; + } finally { + RCLJava.shutdown(); + } + + Assert.assertTrue("Expected Runtime error.", test); + } + + @Test + public final void testSeparateSingleProcess() throws Exception { + boolean test = true; + + try { + RCLJava.rclJavaInit(); + ThreadedExecutor executor = new SingleThreadedExecutor(); + + final Node publisherNode = RCLJava.createNode("publisher_node"); + final Node subscriptionNodeOne = RCLJava.createNode("subscription_node_one"); + final Node subscriptionNodeTwo = RCLJava.createNode("subscription_node_two"); + + NativePublisher publisher = (NativePublisher) publisherNode.createPublisher( + UInt32.class, "test_topic_single"); + + final RCLFuture futureOne = new RCLFuture(executor); + + NativeSubscription subscriptionOne = (NativeSubscription) subscriptionNodeOne.createSubscription( + UInt32.class, "test_topic_single", new TestConsumer(futureOne)); + + final RCLFuture futureTwo = new RCLFuture(executor); + + NativeSubscription subscriptionTwo = (NativeSubscription) subscriptionNodeTwo.createSubscription( + UInt32.class, "test_topic_single", new TestConsumer(futureTwo)); + + UInt32 msg = new UInt32(); + msg.setData(54321); + + executor.addNode(publisherNode); + executor.addNode(subscriptionNodeOne); + executor.addNode(subscriptionNodeTwo); + + executor.spin(); + + while (RCLJava.ok() && !(futureOne.isDone() && futureTwo.isDone())) { + publisher.publish(msg); + } + + UInt32 valueOne = futureOne.get(); + assertEquals(54321, valueOne.getData()); + + UInt32 valueTwo = futureTwo.get(); + assertEquals(54321, valueTwo.getData()); + + executor.removeNode(subscriptionNodeTwo); + executor.removeNode(subscriptionNodeOne); + executor.removeNode(publisherNode); + executor.cancel(); + + publisher.dispose(); + subscriptionOne.dispose(); + subscriptionTwo.dispose(); + + publisherNode.dispose(); + subscriptionNodeOne.dispose(); + subscriptionNodeTwo.dispose(); + } catch (Exception e) { + test = false; + } finally { + RCLJava.shutdown(); + } + + Assert.assertTrue("Expected Runtime error.", test); + } + + @Test + public final void testLinkSingleProcess() throws Exception { + boolean test = true; + + try { + RCLJava.rclJavaInit(); + ThreadedExecutor executor = new SingleThreadedExecutor(); + + final Node publisherNode = RCLJava.createNode("publisher_node"); + final Node subscriptionNodeOne = RCLJava.createNode("subscription_node_one"); + final Node subscriptionNodeTwo = RCLJava.createNode("subscription_node_two"); + + NativePublisher publisher = (NativePublisher) publisherNode.createPublisher( + UInt32.class, "test_topic_single"); + + final RCLFuture futureOne = new RCLFuture(executor); + + NativeSubscription subscriptionOne = (NativeSubscription) subscriptionNodeOne.createSubscription( + UInt32.class, "test_topic_single", new TestConsumer(futureOne)); + + final RCLFuture futureTwo = new RCLFuture(executor); + + NativeSubscription subscriptionTwo = (NativeSubscription) subscriptionNodeTwo.createSubscription( + UInt32.class, "test_topic_single", new TestConsumer(futureTwo)); + + UInt32 msg = new UInt32(); + msg.setData(54321); + + executor.addNode(publisherNode); + executor.addNode(subscriptionNodeOne); + executor.addNode(subscriptionNodeTwo); + + while (RCLJava.ok() && !(futureOne.isDone() && futureTwo.isDone())) { + publisher.publish(msg); + executor.spinOnce(0); + } + + UInt32 valueOne = futureOne.get(); + assertEquals(54321, valueOne.getData()); + + UInt32 valueTwo = futureTwo.get(); + assertEquals(54321, valueTwo.getData()); + + executor.removeNode(subscriptionNodeTwo); + executor.removeNode(subscriptionNodeOne); + executor.removeNode(publisherNode); + executor.cancel(); + + publisher.dispose(); + subscriptionOne.dispose(); + subscriptionTwo.dispose(); + + publisherNode.dispose(); + subscriptionNodeOne.dispose(); + subscriptionNodeTwo.dispose(); + } catch (Exception e) { + test = false; + } finally { + RCLJava.shutdown(); + } + + Assert.assertTrue("Expected Runtime error.", test); + } + + @Test + public final void testSeparateSingleProcessFail() throws Exception { + boolean test = true; + + try { + RCLJava.rclJavaInit(); + ThreadedExecutor executor = new SingleThreadedExecutor(); + + final Node publisherNode = RCLJava.createNode("publisher_node"); + final Node subscriptionNodeOne = RCLJava.createNode("subscription_node_one"); + final Node subscriptionNodeTwo = RCLJava.createNode("subscription_node_two"); + + NativePublisher publisher = (NativePublisher) publisherNode.createPublisher( + UInt32.class, "test_topic_single"); + + final RCLFuture futureOne = new RCLFuture(executor); + + NativeSubscription subscriptionOne = (NativeSubscription) subscriptionNodeOne.createSubscription( + UInt32.class, "test_topic_single", new TestConsumer(futureOne)); + + final RCLFuture futureTwo = new RCLFuture(executor); + + NativeSubscription subscriptionTwo = (NativeSubscription) subscriptionNodeTwo.createSubscription( + UInt32.class, "test_topic_single", new TestConsumerFail(futureTwo)); + + executor.addNode(publisherNode); + executor.addNode(subscriptionNodeOne); + executor.addNode(subscriptionNodeTwo); + + executor.spin(); + + UInt32 msg = new UInt32(); + msg.setData(54321); + + while (RCLJava.ok() && !(futureOne.isDone() && futureTwo.isDone())) { + publisher.publish(msg); + } + + UInt32 valueOne = futureOne.get(); + assertEquals(54321, valueOne.getData()); + + UInt32 valueTwo = futureTwo.get(); + assertEquals(54321, valueTwo.getData()); + + executor.removeNode(subscriptionNodeTwo); + executor.removeNode(subscriptionNodeOne); + executor.removeNode(publisherNode); + executor.cancel(); + + publisher.dispose(); + subscriptionOne.dispose(); + subscriptionTwo.dispose(); + + publisherNode.dispose(); + subscriptionNodeOne.dispose(); + subscriptionNodeTwo.dispose(); + } catch (Exception e) { + test = false; + } finally { + RCLJava.shutdown(); + } + + Assert.assertTrue("Expected Runtime error.", test); + } +} diff --git a/rcljava/src/test/java/org/ros2/rcljava/NodeTest.java b/rcljava/src/test/java/org/ros2/rcljava/NodeTest.java index 029f985..b956b4d 100644 --- a/rcljava/src/test/java/org/ros2/rcljava/NodeTest.java +++ b/rcljava/src/test/java/org/ros2/rcljava/NodeTest.java @@ -32,13 +32,10 @@ import org.ros2.rcljava.node.service.Service; import org.ros2.rcljava.node.service.ServiceCallback; import org.ros2.rcljava.node.topic.SubscriptionCallback; -import org.ros2.rcljava.node.topic.NativePublisher; -import org.ros2.rcljava.node.topic.NativeSubscription; import org.ros2.rcljava.node.topic.Publisher; import org.ros2.rcljava.node.topic.Subscription; import org.ros2.rcljava.qos.QoSProfile; -import std_msgs.msg.UInt32; import java.lang.ref.WeakReference; import java.util.HashMap; @@ -46,9 +43,6 @@ import java.util.Map.Entry; import org.ros2.rcljava.RCLJava; -import org.ros2.rcljava.executor.MultiThreadedExecutor; -import org.ros2.rcljava.executor.SingleThreadedExecutor; -import org.ros2.rcljava.executor.ThreadedExecutor; import org.ros2.rcljava.internal.message.Message; import org.ros2.rcljava.namespace.GraphName; @@ -395,264 +389,4 @@ public void testGraphGetTopics() { } //TODO Test Parameters - - @Test - public final void testPubUInt32LinkMultipleProcess() throws Exception { - boolean test = true; - - try { - RCLJava.rclJavaInit(); - ThreadedExecutor executor = new MultiThreadedExecutor(); - - final Node publisherNode = RCLJava.createNode("publisher_node"); - final Node subscriptionNodeOne = RCLJava.createNode("subscription_node_one"); - final Node subscriptionNodeTwo = RCLJava.createNode("subscription_node_two"); - - NativePublisher publisher = (NativePublisher) publisherNode.createPublisher( - UInt32.class, "test_topic_multiple"); - - final RCLFuture futureOne = new RCLFuture(executor); - - NativeSubscription subscriptionOne = (NativeSubscription) subscriptionNodeOne.createSubscription( - UInt32.class, "test_topic_multiple", new TestConsumer(futureOne)); - - final RCLFuture futureTwo = new RCLFuture(executor); - - NativeSubscription subscriptionTwo = (NativeSubscription) subscriptionNodeTwo.createSubscription( - UInt32.class, "test_topic_multiple", new TestConsumer(futureTwo)); - - UInt32 msg = new UInt32(); - msg.setData(54321); - - executor.addNode(publisherNode); - executor.addNode(subscriptionNodeOne); - executor.addNode(subscriptionNodeTwo); - -// executor.spin(); - - while (RCLJava.ok() && !(futureOne.isDone() && futureTwo.isDone())) { - publisher.publish(msg); - executor.spinOnce(0); - } - - UInt32 valueOne = futureOne.get(); - assertEquals(54321, valueOne.getData()); - - UInt32 valueTwo = futureTwo.get(); - assertEquals(54321, valueTwo.getData()); - - executor.removeNode(subscriptionNodeTwo); - executor.removeNode(subscriptionNodeOne); - executor.removeNode(publisherNode); - executor.cancel(); - - publisher.dispose(); - subscriptionOne.dispose(); - subscriptionTwo.dispose(); - - publisherNode.dispose(); - subscriptionNodeOne.dispose(); - subscriptionNodeTwo.dispose(); - } catch (Exception e) { - test = false; - } finally { - RCLJava.shutdown(); - } - - Assert.assertTrue("Expected Runtime error.", test); - } - - @Test - public final void testPubUInt32SeparateMultipleProcess() throws Exception { - boolean test = true; - - try { - RCLJava.rclJavaInit(); - ThreadedExecutor executor = new MultiThreadedExecutor(); - - final Node publisherNode = RCLJava.createNode("publisher_node"); - final Node subscriptionNodeOne = RCLJava.createNode("subscription_node_one"); - final Node subscriptionNodeTwo = RCLJava.createNode("subscription_node_two"); - - NativePublisher publisher = (NativePublisher) publisherNode.createPublisher( - UInt32.class, "test_topic_multiple"); - - final RCLFuture futureOne = new RCLFuture(executor); - - NativeSubscription subscriptionOne = (NativeSubscription) subscriptionNodeOne.createSubscription( - UInt32.class, "test_topic_multiple", new TestConsumer(futureOne)); - - final RCLFuture futureTwo = new RCLFuture(executor); - - NativeSubscription subscriptionTwo = (NativeSubscription) subscriptionNodeTwo.createSubscription( - UInt32.class, "test_topic_multiple", new TestConsumer(futureTwo)); - - UInt32 msg = new UInt32(); - msg.setData(54321); - - executor.addNode(publisherNode); - executor.addNode(subscriptionNodeOne); - executor.addNode(subscriptionNodeTwo); - - executor.spin(); - - while (RCLJava.ok() && !(futureOne.isDone() && futureTwo.isDone())) { - publisher.publish(msg); - } - - UInt32 valueOne = futureOne.get(); - assertEquals(54321, valueOne.getData()); - - UInt32 valueTwo = futureTwo.get(); - assertEquals(54321, valueTwo.getData()); - - executor.removeNode(subscriptionNodeTwo); - executor.removeNode(subscriptionNodeOne); - executor.removeNode(publisherNode); - executor.cancel(); - - publisher.dispose(); - subscriptionOne.dispose(); - subscriptionTwo.dispose(); - - publisherNode.dispose(); - subscriptionNodeOne.dispose(); - subscriptionNodeTwo.dispose(); - } catch (Exception e) { - test = false; - } finally { - RCLJava.shutdown(); - } - - Assert.assertTrue("Expected Runtime error.", test); - } - - @Test - public final void testPubUInt32SeparateSingleProcess() throws Exception { - boolean test = true; - - try { - RCLJava.rclJavaInit(); - ThreadedExecutor executor = new SingleThreadedExecutor(); - - final Node publisherNode = RCLJava.createNode("publisher_node"); - final Node subscriptionNodeOne = RCLJava.createNode("subscription_node_one"); - final Node subscriptionNodeTwo = RCLJava.createNode("subscription_node_two"); - - NativePublisher publisher = (NativePublisher) publisherNode.createPublisher( - UInt32.class, "test_topic_single"); - - final RCLFuture futureOne = new RCLFuture(executor); - - NativeSubscription subscriptionOne = (NativeSubscription) subscriptionNodeOne.createSubscription( - UInt32.class, "test_topic_single", new TestConsumer(futureOne)); - - final RCLFuture futureTwo = new RCLFuture(executor); - - NativeSubscription subscriptionTwo = (NativeSubscription) subscriptionNodeTwo.createSubscription( - UInt32.class, "test_topic_single", new TestConsumer(futureTwo)); - - UInt32 msg = new UInt32(); - msg.setData(54321); - - executor.addNode(publisherNode); - executor.addNode(subscriptionNodeOne); - executor.addNode(subscriptionNodeTwo); - - executor.spin(); - - while (RCLJava.ok() && !(futureOne.isDone() && futureTwo.isDone())) { - publisher.publish(msg); - } - - UInt32 valueOne = futureOne.get(); - assertEquals(54321, valueOne.getData()); - - UInt32 valueTwo = futureTwo.get(); - assertEquals(54321, valueTwo.getData()); - - executor.removeNode(subscriptionNodeTwo); - executor.removeNode(subscriptionNodeOne); - executor.removeNode(publisherNode); - executor.cancel(); - - publisher.dispose(); - subscriptionOne.dispose(); - subscriptionTwo.dispose(); - - publisherNode.dispose(); - subscriptionNodeOne.dispose(); - subscriptionNodeTwo.dispose(); - } catch (Exception e) { - test = false; - } finally { - RCLJava.shutdown(); - } - - Assert.assertTrue("Expected Runtime error.", test); - } - - @Test - public final void testPubUInt32LinkSingleProcess() throws Exception { - boolean test = true; - - try { - RCLJava.rclJavaInit(); - ThreadedExecutor executor = new SingleThreadedExecutor(); - - final Node publisherNode = RCLJava.createNode("publisher_node"); - final Node subscriptionNodeOne = RCLJava.createNode("subscription_node_one"); - final Node subscriptionNodeTwo = RCLJava.createNode("subscription_node_two"); - - NativePublisher publisher = (NativePublisher) publisherNode.createPublisher( - UInt32.class, "test_topic_single"); - - final RCLFuture futureOne = new RCLFuture(executor); - - NativeSubscription subscriptionOne = (NativeSubscription) subscriptionNodeOne.createSubscription( - UInt32.class, "test_topic_single", new TestConsumer(futureOne)); - - final RCLFuture futureTwo = new RCLFuture(executor); - - NativeSubscription subscriptionTwo = (NativeSubscription) subscriptionNodeTwo.createSubscription( - UInt32.class, "test_topic_single", new TestConsumer(futureTwo)); - - UInt32 msg = new UInt32(); - msg.setData(54321); - - executor.addNode(publisherNode); - executor.addNode(subscriptionNodeOne); - executor.addNode(subscriptionNodeTwo); - - while (RCLJava.ok() && !(futureOne.isDone() && futureTwo.isDone())) { - publisher.publish(msg); - executor.spinOnce(0); - } - - UInt32 valueOne = futureOne.get(); - assertEquals(54321, valueOne.getData()); - - UInt32 valueTwo = futureTwo.get(); - assertEquals(54321, valueTwo.getData()); - - executor.removeNode(subscriptionNodeTwo); - executor.removeNode(subscriptionNodeOne); - executor.removeNode(publisherNode); - executor.cancel(); - - publisher.dispose(); - subscriptionOne.dispose(); - subscriptionTwo.dispose(); - - publisherNode.dispose(); - subscriptionNodeOne.dispose(); - subscriptionNodeTwo.dispose(); - } catch (Exception e) { - test = false; - } finally { - RCLJava.shutdown(); - } - - Assert.assertTrue("Expected Runtime error.", test); - } }