From 1f34ceef61ac4dfe9f8ba0a5297a14bd1c8f3892 Mon Sep 17 00:00:00 2001 From: Ivan Santiago Paunovic Date: Thu, 28 Oct 2021 17:41:09 -0300 Subject: [PATCH 1/3] Add method to prune pending requests older than a timestamp Signed-off-by: Ivan Santiago Paunovic --- .../org/ros2/rcljava/client/ClientImpl.java | 58 ++++++++++++++++--- 1 file changed, 50 insertions(+), 8 deletions(-) diff --git a/rcljava/src/main/java/org/ros2/rcljava/client/ClientImpl.java b/rcljava/src/main/java/org/ros2/rcljava/client/ClientImpl.java index c8496fc2..84ec6f6f 100644 --- a/rcljava/src/main/java/org/ros2/rcljava/client/ClientImpl.java +++ b/rcljava/src/main/java/org/ros2/rcljava/client/ClientImpl.java @@ -22,6 +22,7 @@ import java.lang.Long; import java.util.AbstractMap; import java.util.HashMap; +import java.util.Iterator; import java.util.Map; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -52,7 +53,25 @@ public class ClientImpl implements Client { private final WeakReference nodeReference; private long handle; private final String serviceName; - private Map> pendingRequests; + + private class PendingRequest + { + public Consumer callback; + public ResponseFuture future; + public long requestTimestamp; + + public PendingRequest( + final Consumer callback, + final ResponseFuture future, + final long requestTimestamp) + { + this.callback = callback; + this.future = future; + this.requestTimestamp = requestTimestamp; + } + } + + private Map pendingRequests; private final ServiceDefinition serviceDefinition; @@ -66,7 +85,7 @@ public ClientImpl( this.handle = handle; this.serviceName = serviceName; this.serviceDefinition = serviceDefinition; - this.pendingRequests = new HashMap>(); + this.pendingRequests = new HashMap(); } public ServiceDefinition getServiceDefinition() { @@ -88,8 +107,7 @@ public void accept(Future input) {} request.getDestructorInstance(), request); ResponseFuture future = new ResponseFuture(sequenceNumber); - Map.Entry entry = - new AbstractMap.SimpleEntry(callback, future); + PendingRequest entry = new PendingRequest(callback, future, System.nanoTime()); pendingRequests.put(sequenceNumber, entry); return future; } @@ -98,20 +116,44 @@ public void accept(Future input) {} public final boolean removePendingRequest(ResponseFuture future) { synchronized (pendingRequests) { - Map.Entry entry = pendingRequests.remove( + PendingRequest entry = pendingRequests.remove( future.getRequestSequenceNumber()); return entry != null; } } + public final long + prunePendingRequests() { + synchronized (pendingRequests) { + long size = pendingRequests.size(); + pendingRequests.clear(); + return size; + } + } + + public final long + prunePendingRequestsOlderThan(long nanoTime) { + synchronized (pendingRequests) { + Iterator> iter = pendingRequests.entrySet().iterator(); + long removed = 0; + while(iter.hasNext()) { + if(iter.next().getValue().requestTimestamp < nanoTime) { + iter.remove(); + ++removed; + } + } + return removed; + } + } + public final void handleResponse( final RMWRequestId header, final U response) { synchronized (pendingRequests) { long sequenceNumber = header.sequenceNumber; - Map.Entry entry = pendingRequests.remove(sequenceNumber); + PendingRequest entry = pendingRequests.remove(sequenceNumber); if (entry != null) { - Consumer callback = entry.getKey(); - ResponseFuture future = entry.getValue(); + Consumer callback = entry.callback; + ResponseFuture future = entry.future; future.set(response); callback.accept(future); return; From ede69ec0ebb4f8cf227c590f5057f80b7c4af8e4 Mon Sep 17 00:00:00 2001 From: Ivan Santiago Paunovic Date: Thu, 28 Oct 2021 18:11:58 -0300 Subject: [PATCH 2/3] Add test cases + fixes Signed-off-by: Ivan Santiago Paunovic --- .../java/org/ros2/rcljava/client/Client.java | 4 +++ .../org/ros2/rcljava/client/ClientImpl.java | 4 +-- .../org/ros2/rcljava/client/ClientTest.java | 33 +++++++++++++++++++ 3 files changed, 39 insertions(+), 2 deletions(-) diff --git a/rcljava/src/main/java/org/ros2/rcljava/client/Client.java b/rcljava/src/main/java/org/ros2/rcljava/client/Client.java index 8043cf6f..b76074fa 100644 --- a/rcljava/src/main/java/org/ros2/rcljava/client/Client.java +++ b/rcljava/src/main/java/org/ros2/rcljava/client/Client.java @@ -38,6 +38,10 @@ ResponseFuture asy boolean removePendingRequest(ResponseFuture future); + long prunePendingRequests(); + + long prunePendingRequestsOlderThan(long nanoTime); + /** * Check if the service server is available. * diff --git a/rcljava/src/main/java/org/ros2/rcljava/client/ClientImpl.java b/rcljava/src/main/java/org/ros2/rcljava/client/ClientImpl.java index 84ec6f6f..507dd71e 100644 --- a/rcljava/src/main/java/org/ros2/rcljava/client/ClientImpl.java +++ b/rcljava/src/main/java/org/ros2/rcljava/client/ClientImpl.java @@ -122,7 +122,7 @@ public void accept(Future input) {} } } - public final long + public final long prunePendingRequests() { synchronized (pendingRequests) { long size = pendingRequests.size(); @@ -131,7 +131,7 @@ public void accept(Future input) {} } } - public final long + public final long prunePendingRequestsOlderThan(long nanoTime) { synchronized (pendingRequests) { Iterator> iter = pendingRequests.entrySet().iterator(); diff --git a/rcljava/src/test/java/org/ros2/rcljava/client/ClientTest.java b/rcljava/src/test/java/org/ros2/rcljava/client/ClientTest.java index 03b4a558..d7a15bbd 100644 --- a/rcljava/src/test/java/org/ros2/rcljava/client/ClientTest.java +++ b/rcljava/src/test/java/org/ros2/rcljava/client/ClientTest.java @@ -37,6 +37,7 @@ import org.ros2.rcljava.RCLJava; import org.ros2.rcljava.concurrent.RCLFuture; +import org.ros2.rcljava.consumers.Consumer; import org.ros2.rcljava.consumers.TriConsumer; import org.ros2.rcljava.executors.Executor; import org.ros2.rcljava.node.Node; @@ -157,4 +158,36 @@ public final void testRemovePendingRequest() throws Exception { assertTrue(client.removePendingRequest(responseFuture)); assertFalse(client.removePendingRequest(responseFuture)); } + + @Test + public final void testPrunePendingRequestsOlderThan() throws Exception { + RCLFuture consumerFuture = + new RCLFuture(); + + TestClientConsumer clientConsumer = new TestClientConsumer(consumerFuture); + + Service service = node.createService( + rcljava.srv.AddTwoInts.class, "add_two_ints", clientConsumer); + + rcljava.srv.AddTwoInts_Request request = new rcljava.srv.AddTwoInts_Request(); + request.setA(2); + request.setB(3); + + Client client = + node.createClient( + rcljava.srv.AddTwoInts.class, "add_two_ints"); + + assertTrue(client.waitForService(Duration.ofSeconds(10))); + + long oldNanoTime = System.nanoTime(); + ResponseFuture responseFuture = client.asyncSendRequest( + request, + new Consumer>() { + public final void accept(final Future futureResponse) {} + }); + + assertEquals(0, client.prunePendingRequestsOlderThan(oldNanoTime)); + assertEquals(1, client.prunePendingRequestsOlderThan(System.nanoTime())); + assertEquals(0, client.prunePendingRequestsOlderThan(System.nanoTime())); + } } From 24fbf81821e3e33fbcc2fb0602ab5dc8bd1e6aab Mon Sep 17 00:00:00 2001 From: Ivan Santiago Paunovic Date: Mon, 1 Nov 2021 10:35:37 -0300 Subject: [PATCH 3/3] Add documentation Signed-off-by: Ivan Santiago Paunovic --- .../src/main/java/org/ros2/rcljava/client/Client.java | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/rcljava/src/main/java/org/ros2/rcljava/client/Client.java b/rcljava/src/main/java/org/ros2/rcljava/client/Client.java index b76074fa..2e7f8f95 100644 --- a/rcljava/src/main/java/org/ros2/rcljava/client/Client.java +++ b/rcljava/src/main/java/org/ros2/rcljava/client/Client.java @@ -38,8 +38,18 @@ ResponseFuture asy boolean removePendingRequest(ResponseFuture future); + /** + * Remove old pending requests that this client has done. + * Future responses to removed requests will be ignored. + */ long prunePendingRequests(); + /** + * Remove old pending requests that where done before the specified time point. + * Future responses to removed requests will be ignored. + * + * @param nanoTime requests done before this timepoint will be removed. + */ long prunePendingRequestsOlderThan(long nanoTime); /**