Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/services timestamps #545

Merged
merged 7 commits into from Apr 24, 2020
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 3 additions & 2 deletions rclpy/rclpy/executors.py
Expand Up @@ -334,9 +334,10 @@ def _take_client(self, client):
return _rclpy.rclpy_take_response(capsule, client.srv_type.Response)

async def _execute_client(self, client, seq_and_response):
sequence, response = seq_and_response
if sequence is not None:
header, response = seq_and_response
if header is not None:
try:
sequence = _rclpy.rclpy_service_info_get_sequence_number(header)
future = client._pending_requests[sequence]
except KeyError:
# The request was cancelled
Expand Down
109 changes: 98 additions & 11 deletions rclpy/src/rclpy/_rclpy.c
Expand Up @@ -2666,7 +2666,14 @@ rclpy_send_response(PyObject * Py_UNUSED(self), PyObject * args)
rmw_request_id_t * header = PyCapsule_GetPointer(
pyheader, "rmw_request_id_t");
if (!header) {
return NULL;
rmw_service_info_t * info_header = PyCapsule_GetPointer(
pyheader, "rmw_service_info_t");
if (!info_header) {
return NULL;
}
// clear previous error, work-around worked
PyErr_Restore(NULL, NULL, NULL);
header = &(info_header->request_id);
}

destroy_ros_message_signature * destroy_ros_message = NULL;
Expand Down Expand Up @@ -3306,12 +3313,12 @@ rclpy_take_request(PyObject * Py_UNUSED(self), PyObject * args)
return NULL;
}

rmw_request_id_t * header = PyMem_Malloc(sizeof(rmw_request_id_t));
rmw_service_info_t * header = PyMem_Malloc(sizeof(rmw_service_info_t));
if (!header) {
PyErr_Format(PyExc_MemoryError, "Failed to allocate memory for request header");
return NULL;
}
rcl_ret_t ret = rcl_take_request(&(srv->service), header, taken_request);
rcl_ret_t ret = rcl_take_request_with_info(&(srv->service), header, taken_request);

if (ret != RCL_RET_OK && ret != RCL_RET_SERVICE_TAKE_FAILED) {
PyErr_Format(
Expand All @@ -3337,7 +3344,7 @@ rclpy_take_request(PyObject * Py_UNUSED(self), PyObject * args)
Py_DECREF(pytaken_request);
return NULL;
}
PyObject * pyheader = PyCapsule_New(header, "rmw_request_id_t", NULL);
PyObject * pyheader = PyCapsule_New(header, "rmw_service_info_t", NULL);
if (!pyheader) {
PyMem_Free(header);
Py_DECREF(pytaken_request);
Expand Down Expand Up @@ -3383,18 +3390,17 @@ rclpy_take_response(PyObject * Py_UNUSED(self), PyObject * args)
return NULL;
}

rmw_request_id_t * header = PyMem_Malloc(sizeof(rmw_request_id_t));
rmw_service_info_t * header = PyMem_Malloc(sizeof(rmw_service_info_t));
if (!header) {
PyErr_Format(PyExc_MemoryError, "Failed to allocate memory for response header");
return NULL;
}
rcl_ret_t ret = rcl_take_response(&(client->client), header, taken_response);
int64_t sequence = header->sequence_number;
PyMem_Free(header);
rcl_ret_t ret = rcl_take_response_with_info(&(client->client), header, taken_response);

// Create the tuple to return
PyObject * pytuple = PyTuple_New(2);
if (!pytuple) {
PyMem_Free(header);
return NULL;
}

Expand All @@ -3404,16 +3410,18 @@ rclpy_take_response(PyObject * Py_UNUSED(self), PyObject * args)
if (!pytaken_response) {
// the function has set the Python error
Py_DECREF(pytuple);
PyMem_Free(header);
return NULL;
}

PyObject * pysequence = PyLong_FromLongLong(sequence);
if (!pysequence) {
PyObject * pyheader = PyCapsule_New(header, "rmw_service_info_t", NULL);
if (!pyheader) {
Py_DECREF(pytaken_response);
Py_DECREF(pytuple);
PyMem_Free(header);
return NULL;
}
PyTuple_SET_ITEM(pytuple, 0, pysequence);
PyTuple_SET_ITEM(pytuple, 0, pyheader);
PyTuple_SET_ITEM(pytuple, 1, pytaken_response);
return pytuple;
}
Expand Down Expand Up @@ -5247,6 +5255,70 @@ rclpy_deserialize(PyObject * Py_UNUSED(self), PyObject * args)
return pydeserialized_ros_msg;
}

static rmw_service_info_t *
_service_info_from_args(PyObject * args)
{
PyObject * pyservice_info;

if (!PyArg_ParseTuple(args, "O", &pyservice_info)) {
return NULL;
}

return PyCapsule_GetPointer(pyservice_info, "rmw_service_info_t");
}

/// Retrieves the sequence number from a rmw_service_info_t capsule
/**
* Raises RuntimeError on failure.
*
* \param[in] pyservice_info Capsule pointing to the rmw_service_info_t
* \return the sequence number as a long
*/
static PyObject *
rclpy_service_info_get_sequence_number(PyObject * Py_UNUSED(self), PyObject * args)
{
rmw_service_info_t * service_info = _service_info_from_args(args);
if (service_info == NULL) {
return NULL;
}
return PyLong_FromLongLong(service_info->request_id.sequence_number);
}

/// Retrieves the source timestamp number from a rmw_service_info_t capsule
/**
* Raises RuntimeError on failure.
*
* \param[in] pyservice_info Capsule pointing to the rmw_service_info_t
* \return the source timestamps as a long
*/
static PyObject *
rclpy_service_info_get_source_timestamp(PyObject * Py_UNUSED(self), PyObject * args)
{
rmw_service_info_t * service_info = _service_info_from_args(args);
if (service_info == NULL) {
return NULL;
}
return PyLong_FromLongLong(service_info->source_timestamp);
}


/// Retrieves the received timestsamp number from a rmw_service_info_t capsule
/**
* Raises RuntimeError on failure.
*
* \param[in] pyservice_info Capsule pointing to the rmw_service_info_t
* \return the receive timestamp as a long
*/
static PyObject *
rclpy_service_info_get_received_timestamp(PyObject * Py_UNUSED(self), PyObject * args)
{
rmw_service_info_t * service_info = _service_info_from_args(args);
if (service_info == NULL) {
return NULL;
}
return PyLong_FromLongLong(service_info->received_timestamp);
}

/// Define the public methods of this module
static PyMethodDef rclpy_methods[] = {
{
Expand Down Expand Up @@ -5630,6 +5702,21 @@ static PyMethodDef rclpy_methods[] = {
"rclpy_deserialize", rclpy_deserialize, METH_VARARGS,
"Deserialize a ROS message."
},
{
"rclpy_service_info_get_sequence_number", rclpy_service_info_get_sequence_number,
METH_VARARGS,
"Retrieve sequence number from service_info"
},
{
"rclpy_service_info_get_source_timestamp", rclpy_service_info_get_source_timestamp,
METH_VARARGS,
"Retrieve source timestamp from service_info"
},
{
"rclpy_service_info_get_received_timestamp", rclpy_service_info_get_received_timestamp,
METH_VARARGS,
"Retrieve received timestamp from service_info"
},

{NULL, NULL, 0, NULL} /* sentinel */
};
Expand Down
25 changes: 25 additions & 0 deletions rclpy/test/test_client.py
Expand Up @@ -18,6 +18,7 @@
from rcl_interfaces.srv import GetParameters
import rclpy
import rclpy.executors
from rclpy.impl.implementation_singleton import rclpy_implementation as _rclpy


# TODO(sloretz) Reduce fudge once wait_for_service uses node graph events
Expand Down Expand Up @@ -90,6 +91,30 @@ def test_concurrent_calls_to_service(self):
self.node.destroy_client(cli)
self.node.destroy_service(srv)

def test_service_timestamps(self):
cli = self.node.create_client(GetParameters, 'get/parameters')
srv = self.node.create_service(
GetParameters, 'get/parameters',
lambda request, response: response)
try:
self.assertTrue(cli.wait_for_service(timeout_sec=20))
cli.call_async(GetParameters.Request())
cycle_count = 0
while cycle_count < 5:
with srv.handle as capsule:
result = _rclpy.rclpy_take_request(capsule, srv.srv_type.Request)
if result is not None:
request, header = result
source_timestamp = _rclpy.rclpy_service_info_get_source_timestamp(header)
self.assertNotEqual(0, source_timestamp)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test always fails with Connext: see #615.

return
else:
time.sleep(0.1)
self.fail("Did not get a request in time")
Karsten1987 marked this conversation as resolved.
Show resolved Hide resolved
finally:
self.node.destroy_client(cli)
self.node.destroy_service(srv)

def test_different_type_raises(self):
cli = self.node.create_client(GetParameters, 'get/parameters')
srv = self.node.create_service(
Expand Down