Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions rcljava/include/org_ros2_rcljava_node_NodeImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,15 @@ JNIEXPORT void
JNICALL Java_org_ros2_rcljava_node_NodeImpl_nativeGetPublishersInfo(
JNIEnv *, jclass, jlong, jstring, jobject);

/*
* Class: org_ros2_rcljava_node_NodeImpl
* Method: nativeGetSubscriptionsInfo
* Signature: (JLjava/lang/String;Ljava/util/List;)V
*/
JNIEXPORT void
JNICALL Java_org_ros2_rcljava_node_NodeImpl_nativeGetSubscriptionsInfo(
JNIEnv *, jclass, jlong, jstring, jobject);

#ifdef __cplusplus
}
#endif
Expand Down
41 changes: 29 additions & 12 deletions rcljava/src/main/cpp/org_ros2_rcljava_node_NodeImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -311,9 +311,10 @@ Java_org_ros2_rcljava_node_NodeImpl_nativeGetTopicNamesAndTypes(
}
}

JNIEXPORT void JNICALL
Java_org_ros2_rcljava_node_NodeImpl_nativeGetPublishersInfo(
JNIEnv * env, jclass, jlong handle, jstring jtopic_name, jobject jpublishers_info)
template<typename FunctorT>
void
get_endpoint_info_common(
JNIEnv * env, jlong handle, jstring jtopic_name, jobject jendpoints_info, FunctorT get_info)
{
rcl_node_t * node = reinterpret_cast<rcl_node_t *>(handle);
if (!node) {
Expand All @@ -323,7 +324,7 @@ Java_org_ros2_rcljava_node_NodeImpl_nativeGetPublishersInfo(
}

rcutils_allocator_t allocator = rcutils_get_default_allocator();
rcl_topic_endpoint_info_array_t publishers_info =
rcl_topic_endpoint_info_array_t endpoints_info =
rcl_get_zero_initialized_topic_endpoint_info_array();

const char * topic_name = env->GetStringUTFChars(jtopic_name, NULL);
Expand All @@ -333,26 +334,26 @@ Java_org_ros2_rcljava_node_NodeImpl_nativeGetPublishersInfo(
return;
}

rcl_ret_t ret = rcl_get_publishers_info_by_topic(
rcl_ret_t ret = get_info(
node,
&allocator,
topic_name,
false, // use ros mangling conventions
&publishers_info);
&endpoints_info);

env->ReleaseStringUTFChars(jtopic_name, topic_name);

RCLJAVA_COMMON_THROW_FROM_RCL(env, ret, "failed to get publisher info");
auto cleanup_info_array = rcpputils::make_scope_exit(
[info_ptr = &publishers_info, allocator_ptr = &allocator, env]() {
[info_ptr = &endpoints_info, allocator_ptr = &allocator, env]() {
rcl_ret_t ret = rcl_topic_endpoint_info_array_fini(info_ptr, allocator_ptr);
if (!env->ExceptionCheck() && RCL_RET_OK != ret) {
rcljava_throw_rclexception(env, ret, "failed to destroy rcl publisher info");
rcljava_throw_rclexception(env, ret, "failed to destroy rcl endpoints info");
}
}
);

jclass list_clazz = env->GetObjectClass(jpublishers_info);
jclass list_clazz = env->GetObjectClass(jendpoints_info);
jmethodID list_add_mid = env->GetMethodID(list_clazz, "add", "(Ljava/lang/Object;)Z");
RCLJAVA_COMMON_CHECK_FOR_EXCEPTION(env);
jclass endpoint_info_clazz = env->FindClass("org/ros2/rcljava/graph/EndpointInfo");
Expand All @@ -363,13 +364,29 @@ Java_org_ros2_rcljava_node_NodeImpl_nativeGetPublishersInfo(
endpoint_info_clazz, "nativeFromRCL", "(J)V");
RCLJAVA_COMMON_CHECK_FOR_EXCEPTION(env);

for (size_t i = 0; i < publishers_info.size; i++) {
for (size_t i = 0; i < endpoints_info.size; i++) {
jobject item = env->NewObject(endpoint_info_clazz, endpoint_info_init_mid);
RCLJAVA_COMMON_CHECK_FOR_EXCEPTION(env);
env->CallVoidMethod(item, endpoint_info_from_rcl_mid, &publishers_info.info_array[i]);
env->CallVoidMethod(item, endpoint_info_from_rcl_mid, &endpoints_info.info_array[i]);
RCLJAVA_COMMON_CHECK_FOR_EXCEPTION(env);
env->CallBooleanMethod(jpublishers_info, list_add_mid, item);
env->CallBooleanMethod(jendpoints_info, list_add_mid, item);
RCLJAVA_COMMON_CHECK_FOR_EXCEPTION(env);
env->DeleteLocalRef(item);
}
}

JNIEXPORT void JNICALL
Java_org_ros2_rcljava_node_NodeImpl_nativeGetPublishersInfo(
JNIEnv * env, jclass, jlong handle, jstring jtopic_name, jobject jpublishers_info)
{
get_endpoint_info_common(
env, handle, jtopic_name, jpublishers_info, rcl_get_publishers_info_by_topic);
}

JNIEXPORT void JNICALL
Java_org_ros2_rcljava_node_NodeImpl_nativeGetSubscriptionsInfo(
JNIEnv * env, jclass, jlong handle, jstring jtopic_name, jobject jsubscriptions_info)
{
get_endpoint_info_common(
env, handle, jtopic_name, jsubscriptions_info, rcl_get_subscriptions_info_by_topic);
}
12 changes: 12 additions & 0 deletions rcljava/src/main/java/org/ros2/rcljava/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -571,4 +571,16 @@ <T extends ServiceDefinition> Client<T> createClient(final Class<T> serviceType,
* passed topic.
*/
Collection<EndpointInfo> getPublishersInfo(final String topicName);

/**
* Get information of all subscriptions in a topic.
*
* The queried information includes the node that created the publisher, its qos, etc.
* For more info, see @{link EndpointInfo}.
*
* @param topicName The topic name of interest.
* @return A collection of `EndpointInfo` instances, describing all subscriptions in the
* passed topic.
*/
Collection<EndpointInfo> getSubscriptionsInfo(final String topicName);
}
9 changes: 9 additions & 0 deletions rcljava/src/main/java/org/ros2/rcljava/node/NodeImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -758,4 +758,13 @@ public final Collection<EndpointInfo> getPublishersInfo(final String topicName)

private native static final void nativeGetPublishersInfo(
final long handle, final String topicName, ArrayList<EndpointInfo> endpointInfo);

public final Collection<EndpointInfo> getSubscriptionsInfo(final String topicName) {
ArrayList<EndpointInfo> returnValue = new ArrayList();
nativeGetSubscriptionsInfo(this.handle, topicName, returnValue);
return returnValue;
}

private native static final void nativeGetSubscriptionsInfo(
final long handle, final String topicName, ArrayList<EndpointInfo> endpointInfo);
}
59 changes: 58 additions & 1 deletion rcljava/src/test/java/org/ros2/rcljava/node/NodeTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -993,4 +993,61 @@ public void accept(final Collection<EndpointInfo> info) {
publisher.dispose();
publisher2.dispose();
}
}

@Test
public final void testGetSubscriptionsInfo() {
Subscription<rcljava.msg.UInt32> subscription = node.<rcljava.msg.UInt32>createSubscription(
rcljava.msg.UInt32.class, "test_get_subscriptions_info", new Consumer<rcljava.msg.UInt32>() {
public void accept(final rcljava.msg.UInt32 msg) {}
});
Subscription<rcljava.msg.UInt32> subscription2 = node.<rcljava.msg.UInt32>createSubscription(
rcljava.msg.UInt32.class, "test_get_subscriptions_info", new Consumer<rcljava.msg.UInt32>() {
public void accept(final rcljava.msg.UInt32 msg) {}
}, QoSProfile.sensorData());

Consumer<Collection<EndpointInfo>> validateEndpointInfo =
new Consumer<Collection<EndpointInfo>>() {
public void accept(final Collection<EndpointInfo> info) {
assertEquals(info.size(), 2);
Iterator<EndpointInfo> it = info.iterator();
EndpointInfo item = it.next();
assertEquals("test_node", item.nodeName);
assertEquals("/", item.nodeNamespace);
assertEquals("rcljava/msg/UInt32", item.topicType);
assertEquals(item.endpointType, EndpointInfo.EndpointType.SUBSCRIPTION);
assertEquals(item.qos.getReliability(), Reliability.RELIABLE);
item = it.next();
assertEquals("test_node", item.nodeName);
assertEquals("/", item.nodeNamespace);
assertEquals("rcljava/msg/UInt32", item.topicType);
assertEquals(item.endpointType, EndpointInfo.EndpointType.SUBSCRIPTION);
assertEquals(item.qos.getReliability(), Reliability.BEST_EFFORT);
assertFalse(it.hasNext());
}
};

long start = System.currentTimeMillis();
boolean ok = false;
Collection<EndpointInfo> subscriptionsInfo = null;
do {
subscriptionsInfo = node.getSubscriptionsInfo("/test_get_subscriptions_info");
try {
validateEndpointInfo.accept(subscriptionsInfo);
ok = true;
} catch (AssertionError err) {
// ignore here, it's going to be validated again at the end.
}
// TODO(ivanpauno): We could wait for the graph guard condition to be triggered if that
// would be available.
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException err) {
// ignore
}
} while (!ok && System.currentTimeMillis() < start + 1000);
assertNotNull(subscriptionsInfo);
validateEndpointInfo.accept(subscriptionsInfo);
subscription.dispose();
subscription2.dispose();
}
}