Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add normalize_parameters and evaluate_paramters #192

Merged
merged 13 commits into from
Mar 6, 2019
86 changes: 16 additions & 70 deletions launch_ros/launch_ros/actions/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

"""Module for the Node action."""

from collections.abc import Mapping
import logging
import os
import pathlib
Expand All @@ -25,19 +26,20 @@
from typing import Text # noqa: F401
from typing import Tuple

from launch import Substitution
from launch.action import Action
from launch.actions import ExecuteProcess
from launch.launch_context import LaunchContext
from launch.some_substitutions_type import SomeSubstitutionsType
from launch.some_substitutions_type import SomeSubstitutionsType_types_tuple
from launch.substitutions import LocalSubstitution
from launch.utilities import ensure_argument_type
from launch.utilities import normalize_to_list_of_substitutions
from launch.utilities import perform_substitutions

from launch_ros.parameters_type import SomeParameters
from launch_ros.remap_rule_type import SomeRemapRules
from launch_ros.substitutions import ExecutableInPackage
from launch_ros.utilities import evaluate_parameters
from launch_ros.utilities import normalize_parameters
from launch_ros.utilities import normalize_remap_rules

from rclpy.validate_namespace import validate_namespace
Expand All @@ -57,7 +59,7 @@ def __init__(
node_executable: SomeSubstitutionsType,
node_name: Optional[SomeSubstitutionsType] = None,
node_namespace: Optional[SomeSubstitutionsType] = None,
parameters: Optional[List[SomeSubstitutionsType]] = None,
parameters: Optional[SomeParameters] = None,
remappings: Optional[SomeRemapRules] = None,
arguments: Optional[Iterable[SomeSubstitutionsType]] = None,
**kwargs
Expand Down Expand Up @@ -136,11 +138,9 @@ def __init__(
ensure_argument_type(parameters, (list), 'parameters', 'Node')
# All elements in the list are paths to files with parameters (or substitutions that
# evaluate to paths), or dictionaries of parameters (fields can be substitutions).
parameter_types = list(SomeSubstitutionsType_types_tuple) + [pathlib.Path, dict]
i = 0
for param in parameters:
ensure_argument_type(param, parameter_types, 'parameters[{}]'.format(i), 'Node')
if isinstance(param, dict) and node_name is None:
if isinstance(param, Mapping) and node_name is None:
raise RuntimeError(
'If a dictionary of parameters is specified, the node name must also be '
'specified. See https://github.com/ros2/launch/issues/139')
Expand All @@ -149,6 +149,7 @@ def __init__(
'ros_specific_arguments[{}]'.format(ros_args_index),
description='parameter {}'.format(i))]
ros_args_index += 1
normalized_params = normalize_parameters(parameters)
if remappings is not None:
i = 0
for remapping in normalize_remap_rules(remappings):
Expand All @@ -163,7 +164,7 @@ def __init__(
self.__node_executable = node_executable
self.__node_name = node_name
self.__node_namespace = node_namespace
self.__parameters = [] if parameters is None else parameters
self.__parameters = [] if parameters is None else normalized_params
self.__remappings = [] if remappings is None else remappings
self.__arguments = arguments

Expand All @@ -182,65 +183,11 @@ def node_name(self):
raise RuntimeError("cannot access 'node_name' before executing action")
return self.__final_node_name

def _create_params_file_from_dict(self, context, params):
def _create_params_file_from_dict(self, params):
with NamedTemporaryFile(mode='w', prefix='launch_params_', delete=False) as h:
param_file_path = h.name
# TODO(dhood): clean up generated parameter files.

def perform_substitution_if_applicable(context, var):
if isinstance(var, (int, float, str)):
# No substitution necessary.
return var
if isinstance(var, Substitution):
return perform_substitutions(context, normalize_to_list_of_substitutions(var))
if isinstance(var, tuple):
try:
return perform_substitutions(
context, normalize_to_list_of_substitutions(var))
except TypeError:
raise TypeError(
'Invalid element received in parameters dictionary '
'(not all tuple elements are Substitutions): {}'.format(var))
else:
raise TypeError(
'Unsupported type received in parameters dictionary: {}'
.format(type(var)))

def expand_dict(input_dict):
expanded_dict = {}
for k, v in input_dict.items():
# Key (parameter/group name) can only be a string/Substitutions that evaluates
# to a string.
expanded_key = perform_substitutions(
context, normalize_to_list_of_substitutions(k))
if isinstance(v, dict):
# Expand the nested dict.
expanded_value = expand_dict(v)
elif isinstance(v, list):
# Expand each element.
expanded_value = []
for e in v:
if isinstance(e, list):
raise TypeError(
'Nested lists are not supported for parameters: {} found in {}'
.format(e, v))
expanded_value.append(perform_substitution_if_applicable(context, e))
# Tuples are treated as Substitution(s) to be concatenated.
elif isinstance(v, tuple):
for e in v:
ensure_argument_type(
e, SomeSubstitutionsType_types_tuple,
'parameter dictionary tuple entry', 'Node')
expanded_value = perform_substitutions(
context, normalize_to_list_of_substitutions(v))
else:
expanded_value = perform_substitution_if_applicable(context, v)
expanded_dict[expanded_key] = expanded_value
return expanded_dict

expanded_dict = expand_dict(params)
param_dict = {
self.__expanded_node_name: {'ros__parameters': expanded_dict}}
param_dict = {self.__expanded_node_name: {'ros__parameters': params}}
if self.__expanded_node_namespace:
param_dict = {self.__expanded_node_namespace: param_dict}
yaml.dump(param_dict, h, default_flow_style=False)
Expand Down Expand Up @@ -281,15 +228,14 @@ def _perform_substitutions(self, context: LaunchContext) -> None:
# expand parameters too
if self.__parameters is not None:
self.__expanded_parameter_files = []
for params in self.__parameters:
evaluated_parameters = evaluate_parameters(context, self.__parameters)
for params in evaluated_parameters:
if isinstance(params, dict):
param_file_path = self._create_params_file_from_dict(context, params)
param_file_path = self._create_params_file_from_dict(params)
elif isinstance(params, pathlib.Path):
param_file_path = str(params)
else:
if isinstance(params, pathlib.Path):
param_file_path = str(params)
else:
param_file_path = perform_substitutions(
context, normalize_to_list_of_substitutions(params))
raise RuntimeError('invalid normalized parameters {}'.format(repr(params)))
if not os.path.isfile(param_file_path):
_logger.warn(
'Parameter file path is not a file: {}'.format(param_file_path))
Expand Down
64 changes: 64 additions & 0 deletions launch_ros/launch_ros/parameters_type.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# Copyright 2019 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Module for ROS parameter types."""

import pathlib

from typing import Any
from typing import Dict
from typing import Mapping
from typing import Sequence
from typing import Union

from launch.some_substitutions_type import SomeSubstitutionsType
from launch.substitution import Substitution


# Parameter value types used below
_SingleValueType = Union[str, int, float, bool]
_MultiValueType = Union[
Sequence[str], Sequence[int], Sequence[float], Sequence[bool], bytes]

SomeParameterFile = Union[SomeSubstitutionsType, pathlib.Path]
SomeParameterName = Sequence[Union[Substitution, str]]
SomeParameterValue = Union[SomeSubstitutionsType,
Sequence[SomeSubstitutionsType],
_SingleValueType,
_MultiValueType]

# TODO(sloretz) Recursive type when mypy supports them python/mypy#731
_SomeParametersDict = Mapping[SomeParameterName, Any]
SomeParametersDict = Mapping[SomeParameterName, Union[SomeParameterValue, _SomeParametersDict]]

# Paths to yaml files with parameters, or dictionaries of parameters, or pairs of
# parameter names and values
SomeParameters = Sequence[Union[SomeParameterFile, Mapping[SomeParameterName, SomeParameterValue]]]

ParameterFile = Sequence[Substitution]
ParameterName = Sequence[Substitution]
ParameterValue = Union[Sequence[Substitution],
Sequence[Sequence[Substitution]],
_SingleValueType,
_MultiValueType]

# Normalized (flattened to avoid having a recursive type) parameter dict
ParametersDict = Dict[ParameterName, ParameterValue]

# Normalized parameters
Parameters = Sequence[Union[ParameterFile, ParametersDict]]

EvaluatedParameterValue = Union[_SingleValueType, _MultiValueType]
# Evaluated parameters: filenames or dictionary after substitutions have been evaluated
EvaluatedParameters = Sequence[Union[pathlib.Path, Dict[str, EvaluatedParameterValue]]]
4 changes: 4 additions & 0 deletions launch_ros/launch_ros/utilities/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,15 @@
Descriptions are not executable and are immutable so they can be reused by launch entities.
"""

from .evaluate_parameters import evaluate_parameters
from .normalize_parameters import normalize_parameters
from .normalize_remap_rule import normalize_remap_rule
from .normalize_remap_rule import normalize_remap_rules


__all__ = [
'evaluate_parameters',
'normalize_parameters',
'normalize_remap_rule',
'normalize_remap_rules',
]
86 changes: 86 additions & 0 deletions launch_ros/launch_ros/utilities/evaluate_parameters.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
# Copyright 2019 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Module with utility for evaluating parametesr in a launch context."""


from collections.abc import Mapping
from collections.abc import Sequence
import pathlib
from typing import cast
from typing import Dict
from typing import List
from typing import Optional # noqa
from typing import Union

from launch.launch_context import LaunchContext
from launch.substitution import Substitution
from launch.utilities import ensure_argument_type
from launch.utilities import perform_substitutions

from ..parameters_type import EvaluatedParameters
from ..parameters_type import EvaluatedParameterValue # noqa
from ..parameters_type import Parameters


def evaluate_parameters(context: LaunchContext, parameters: Parameters) -> EvaluatedParameters:
"""
Evaluate substitutions to produce paths and name/value pairs.

The parameters must have been normalized with normalize_parameters() prior to calling this.

:param parameters: normalized parameters
:returns: values after evaluating lists of substitutions
"""
output_params = [] # type: List[Union[pathlib.Path, Dict[str, EvaluatedParameterValue]]]
for param in parameters:
# If it's a list of substitutions then evaluate them to a string and return a pathlib.Path
if isinstance(param, tuple) and len(param) and isinstance(param[0], Substitution):
# Evaluate a list of Substitution to a file path
output_params.append(pathlib.Path(perform_substitutions(context, list(param))))
elif isinstance(param, Mapping):
# It's a list of name/value pairs
output_dict = {} # type: Dict[str, EvaluatedParameterValue]
for name, value in param.items():
if not isinstance(name, tuple):
raise TypeError('Expecting tuple of substitutions got {}'.format(repr(name)))
evaluated_name = perform_substitutions(context, list(name)) # type: str
evaluated_value = None # type: Optional[EvaluatedParameterValue]

if isinstance(value, tuple) and len(value):
if isinstance(value[0], Substitution):
# Value is a list of substitutions, so perform them to make a string
evaluated_value = perform_substitutions(context, list(value))
elif isinstance(value[0], Sequence):
# Value is an array of a list of substitutions
output_subvalue = [] # List[str]
for subvalue in value:
output_subvalue.append(perform_substitutions(context, list(subvalue)))
evaluated_value = tuple(output_subvalue)
else:
# Value is an array of the same type, so nothing to evaluate.
output_value = []
target_type = type(value[0])
for i, subvalue in enumerate(value):
output_value.append(target_type(subvalue))
evaluated_value = tuple(output_value)
else:
# Value is a singular type, so nothing to evaluate
ensure_argument_type(value, (float, int, str, bool, bytes), 'value')
evaluated_value = cast(Union[float, int, str, bool, bytes], value)
if evaluated_value is None:
raise TypeError('given unnormalized parameters %r, %r' % (name, value))
output_dict[evaluated_name] = evaluated_value
output_params.append(output_dict)
return tuple(output_params)
Loading