Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add ros2 param #95

Merged
merged 7 commits into from
Jun 4, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions ros2param/package.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format2.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="2">
<name>ros2param</name>
<version>0.4.0</version>
<description>
The param command for ROS 2 command line tools.
</description>
<maintainer email="dthomas@osrfoundation.org">Dirk Thomas</maintainer>
<license>Apache License 2.0</license>

<depend>rcl_interfaces</depend>
<depend>rclpy</depend>
<depend>ros2cli</depend>

<exec_depend>python3-yaml</exec_depend>
<exec_depend>ros2node</exec_depend>

<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>python3-pytest</test_depend>

<export>
<build_type>ament_python</build_type>
</export>
</package>
Empty file.
105 changes: 105 additions & 0 deletions ros2param/ros2param/api/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
# Copyright 2018 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from rcl_interfaces.msg import ParameterType
from rcl_interfaces.msg import ParameterValue
from rcl_interfaces.srv import GetParameters
from rcl_interfaces.srv import SetParameters
import rclpy


def get_parameter_value(*, string_value):
"""Guess the desired type of the parameter based on the string value."""
value = ParameterValue()
if string_value.lower() in ('true', 'false'):
value.type = ParameterType.PARAMETER_BOOL
value.bool_value = string_value.lower() == 'true'
elif _is_integer(string_value):
value.type = ParameterType.PARAMETER_INTEGER
value.integer_value = int(string_value)
elif _is_float(string_value):
value.type = ParameterType.PARAMETER_DOUBLE
value.double_value = float(string_value)
else:
value.type = ParameterType.PARAMETER_STRING
value.string_value = string_value
return value


def _is_integer(string_value):
try:
integer_value = int(string_value)
except ValueError:
return False
return str(integer_value) == string_value


def _is_float(string_value):
try:
float(string_value)
except ValueError:
return False
return True


def call_get_parameters(*, node, node_name, parameter_names):
# create client
client = node.create_client(
GetParameters,
'/{node_name}/get_parameters'.format_map(locals()))

# call as soon as ready
ready = client.wait_for_service(timeout_sec=5.0)
if not ready:
raise RuntimeError('Wait for service timed out')

request = GetParameters.Request()
request.names = parameter_names
future = client.call_async(request)
rclpy.spin_until_future_complete(node, future)

# handle response
response = future.result()
if response is None:
e = future.exception()
raise RuntimeError(
'Exception while calling service of node '
"'{args.node_name}': {e}".format_map(locals()))
return response


def call_set_parameters(*, node, node_name, parameters):
# create client
client = node.create_client(
SetParameters,
'/{node_name}/set_parameters'.format_map(locals()))

# call as soon as ready
ready = client.wait_for_service(timeout_sec=5.0)
if not ready:
raise RuntimeError('Wait for service timed out')

request = SetParameters.Request()
request.parameters = parameters
future = client.call_async(request)
rclpy.spin_until_future_complete(node, future)

# handle response
response = future.result()
if response is None:
e = future.exception()
raise RuntimeError(
'Exception while calling service of node '
"'{args.node_name}': {e}".format_map(locals()))
return response
Empty file.
40 changes: 40 additions & 0 deletions ros2param/ros2param/command/param.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# Copyright 2018 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from ros2cli.command import add_subparsers
from ros2cli.command import CommandExtension
from ros2cli.verb import get_verb_extensions


class ParamCommand(CommandExtension):
"""Various param related sub-commands."""

def add_arguments(self, parser, cli_name):
self._subparser = parser

# get verb extensions and let them add their arguments
verb_extensions = get_verb_extensions('ros2param.verb')
add_subparsers(
parser, cli_name, '_verb', verb_extensions, required=False)

def main(self, *, parser, args):
if not hasattr(args, '_verb'):
# in case no verb was passed
self._subparser.print_help()
return 0

extension = getattr(args, '_verb')

# call the verb's main method
return extension.main(args=args)
44 changes: 44 additions & 0 deletions ros2param/ros2param/verb/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# Copyright 2018 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from ros2cli.plugin_system import PLUGIN_SYSTEM_VERSION
from ros2cli.plugin_system import satisfies_version


class VerbExtension:
"""
The extension point for 'param' verb extensions.

The following properties must be defined:
* `NAME` (will be set to the entry point name)

The following methods must be defined:
* `main`

The following methods can be defined:
* `add_arguments`
"""

NAME = None
EXTENSION_POINT_VERSION = '0.1'

def __init__(self):
super(VerbExtension, self).__init__()
satisfies_version(PLUGIN_SYSTEM_VERSION, '^0.1')

def add_arguments(self, parser, cli_name):
pass

def main(self, *, args):
raise NotImplementedError()
74 changes: 74 additions & 0 deletions ros2param/ros2param/verb/delete.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
# Copyright 2018 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import sys

from rcl_interfaces.msg import Parameter
from rcl_interfaces.msg import ParameterType
from rcl_interfaces.msg import ParameterValue
from ros2cli.node.direct import DirectNode
from ros2cli.node.strategy import add_arguments
from ros2cli.node.strategy import NodeStrategy
from ros2node.api import get_node_names
from ros2node.api import NodeNameCompleter
from ros2param.api import call_set_parameters
from ros2param.verb import VerbExtension


class DeleteVerb(VerbExtension):
"""Delete parameter."""

def add_arguments(self, parser, cli_name): # noqa: D102
add_arguments(parser)
arg = parser.add_argument(
'node_name', help='Name of the ROS node')
arg.completer = NodeNameCompleter(
include_hidden_nodes_key='include_hidden_nodes')
parser.add_argument(
'--include-hidden-nodes', action='store_true',
help='Consider hidden nodes as well')
parser.add_argument(
'name', help='Name of the parameter')

def main(self, *, args): # noqa: D102
with NodeStrategy(args) as node:
node_names = get_node_names(
node=node, include_hidden_nodes=args.include_hidden_nodes)

if args.node_name not in node_names:
return 'Node not found'

with DirectNode(args) as node:
parameter = Parameter()
Parameter.name = args.name
value = ParameterValue()
value.type = ParameterType.PARAMETER_NOT_SET
parameter.value = value

response = call_set_parameters(
node=node, node_name=args.node_name, parameters=[parameter])

# output response
assert len(response.results) == 1
result = response.results[0]
if result.successful:
msg = 'Deleted parameter successfully'
if result.reason:
msg += ': ' + result.reason
print(msg)
else:
msg = 'Deleting parameter failed'
if result.reason:
msg += ': ' + result.reason
print(msg, file=sys.stderr)
99 changes: 99 additions & 0 deletions ros2param/ros2param/verb/get.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
# Copyright 2018 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from rcl_interfaces.msg import ParameterType
from ros2cli.node.direct import DirectNode
from ros2cli.node.strategy import add_arguments
from ros2cli.node.strategy import NodeStrategy
from ros2node.api import get_node_names
from ros2node.api import NodeNameCompleter
from ros2param.api import call_get_parameters
from ros2param.verb import VerbExtension


class GetVerb(VerbExtension):
"""Get parameter."""

def add_arguments(self, parser, cli_name): # noqa: D102
add_arguments(parser)
arg = parser.add_argument(
'node_name', help='Name of the ROS node')
arg.completer = NodeNameCompleter(
include_hidden_nodes_key='include_hidden_nodes')
parser.add_argument(
'--include-hidden-nodes', action='store_true',
help='Consider hidden nodes as well')
parser.add_argument(
'name', help='Name of the parameter')
parser.add_argument(
'--hide-type', action='store_true',
help='Hide the type information')

def main(self, *, args): # noqa: D102
with NodeStrategy(args) as node:
node_names = get_node_names(
node=node, include_hidden_nodes=args.include_hidden_nodes)

if args.node_name not in node_names:
return 'Node not found'

with DirectNode(args) as node:
response = call_get_parameters(
node=node, node_name=args.node_name,
parameter_names=[args.name])

assert len(response.values) <= 1

# requested parameter not set
if not response.values:
return 'Parameter not set'

# extract type specific value
pvalue = response.values[0]
if pvalue.type == ParameterType.PARAMETER_BOOL:
label = 'Boolean value is:'
value = pvalue.bool_value
elif pvalue.type == ParameterType.PARAMETER_INTEGER:
label = 'Integer value is:'
value = pvalue.integer_value
elif pvalue.type == ParameterType.PARAMETER_DOUBLE:
label = 'Double value is:'
value = pvalue.double_value
elif pvalue.type == ParameterType.PARAMETER_STRING:
label = 'String value is:'
value = pvalue.string_value
elif pvalue.type == ParameterType.PARAMETER_BYTE_ARRAY:
label = 'Byte values are:'
value = pvalue.byte_array_value
elif pvalue.type == ParameterType.PARAMETER_BOOL_ARRAY:
label = 'Boolean values are:'
value = pvalue.bool_array_value
elif pvalue.type == ParameterType.PARAMETER_INTEGER_ARRAY:
label = 'Integer values are:'
value = pvalue.integer_array_value
elif pvalue.type == ParameterType.PARAMETER_DOUBLE_ARRAY:
label = 'Double values are:'
value = pvalue.double_array_value
elif pvalue.type == ParameterType.PARAMETER_STRING_ARRAY:
label = 'String values are:'
value = pvalue.string_array_value
else:
return "Unknown parameter type '{pvalue.type}'" \
.format_map(locals())

# output response
if not args.hide_type:
print(label, value)
else:
print(value)
Loading