diff --git a/demo_nodes_py/demo_nodes_py/events/matched_event_detect.py b/demo_nodes_py/demo_nodes_py/events/matched_event_detect.py
index 45bbd6db9..d72c00f45 100644
--- a/demo_nodes_py/demo_nodes_py/events/matched_event_detect.py
+++ b/demo_nodes_py/demo_nodes_py/events/matched_event_detect.py
@@ -12,6 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+from typing import Union
+
from example_interfaces.msg import String
import rclpy
from rclpy.event_handler import PublisherEventCallbacks
@@ -39,7 +41,7 @@
class MatchedEventDetectNode(Node):
- def __init__(self, pub_topic_name: String, sub_topic_name: String):
+ def __init__(self, pub_topic_name: str, sub_topic_name: str):
super().__init__('matched_event_detection_node')
self.__any_subscription_connected = False # used for publisher event
self.__any_publisher_connected = False # used for subscription event
@@ -49,10 +51,10 @@ def __init__(self, pub_topic_name: String, sub_topic_name: String):
event_callbacks=pub_event_callback)
sub_event_callback = SubscriptionEventCallbacks(matched=self.__sub_matched_event_callback)
- self.sub = self.create_subscription(String, sub_topic_name, lambda msg: ...,
+ self.sub = self.create_subscription(String, sub_topic_name, lambda msg: None,
10, event_callbacks=sub_event_callback)
- def __pub_matched_event_callback(self, info: QoSPublisherMatchedInfo):
+ def __pub_matched_event_callback(self, info: QoSPublisherMatchedInfo) -> None:
if self.__any_subscription_connected:
if info.current_count == 0:
self.get_logger().info('Last subscription is disconnected.')
@@ -69,7 +71,7 @@ def __pub_matched_event_callback(self, info: QoSPublisherMatchedInfo):
self.future.set_result(True)
- def __sub_matched_event_callback(self, info: QoSSubscriptionMatchedInfo):
+ def __sub_matched_event_callback(self, info: QoSSubscriptionMatchedInfo) -> None:
if self.__any_publisher_connected:
if info.current_count == 0:
self.get_logger().info('Last publisher is disconnected.')
@@ -86,25 +88,25 @@ def __sub_matched_event_callback(self, info: QoSSubscriptionMatchedInfo):
self.future.set_result(True)
- def get_future(self):
- self.future = Future()
+ def get_future(self) -> Future[bool]:
+ self.future: Future[bool] = Future()
return self.future
class MultiSubNode(Node):
- def __init__(self, topic_name: String):
+ def __init__(self, topic_name: str):
super().__init__('multi_sub_node')
- self.__subs = []
+ self.__subs: list[Subscription[String]] = []
self.__topic_name = topic_name
- def create_one_sub(self) -> Subscription:
+ def create_one_sub(self) -> Subscription[String]:
self.get_logger().info('Create a new subscription.')
- sub = self.create_subscription(String, self.__topic_name, lambda msg: ..., 10)
+ sub = self.create_subscription(String, self.__topic_name, lambda msg: None, 10)
self.__subs.append(sub)
return sub
- def destroy_one_sub(self, sub: Subscription):
+ def destroy_one_sub(self, sub: Subscription[String]) -> None:
if sub in self.__subs:
self.get_logger().info('Destroy a subscription.')
@@ -114,18 +116,18 @@ def destroy_one_sub(self, sub: Subscription):
class MultiPubNode(Node):
- def __init__(self, topic_name: String):
+ def __init__(self, topic_name: str):
super().__init__('multi_pub_node')
- self.__pubs = []
+ self.__pubs: list[Publisher[String]] = []
self.__topic_name = topic_name
- def create_one_pub(self) -> Publisher:
+ def create_one_pub(self) -> Publisher[String]:
self.get_logger().info('Create a new publisher.')
pub = self.create_publisher(String, self.__topic_name, 10)
self.__pubs.append(pub)
return pub
- def destroy_one_pub(self, pub: Publisher):
+ def destroy_one_pub(self, pub: Publisher[String]) -> None:
if pub in self.__pubs:
self.get_logger().info('Destroy a publisher.')
@@ -133,7 +135,7 @@ def destroy_one_pub(self, pub: Publisher):
self.destroy_publisher(pub)
-def main(args=None):
+def main(args: Union[list[str], None] = None) -> None:
try:
with rclpy.init(args=args):
topic_name_for_detect_pub_matched_event = 'pub_topic_matched_event_detect'
diff --git a/demo_nodes_py/demo_nodes_py/logging/use_logger_service.py b/demo_nodes_py/demo_nodes_py/logging/use_logger_service.py
index ce8aa22be..dbe7482b2 100644
--- a/demo_nodes_py/demo_nodes_py/logging/use_logger_service.py
+++ b/demo_nodes_py/demo_nodes_py/logging/use_logger_service.py
@@ -1,3 +1,4 @@
+#!/usr/bin/env python3
# Copyright 2023 Sony Group Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -15,6 +16,8 @@
import argparse
import threading
import time
+from typing import Literal
+from typing import Union
from example_interfaces.msg import String
from rcl_interfaces.msg import LoggerLevel
@@ -66,12 +69,12 @@
class LoggerServiceNode(Node):
- def __init__(self):
+ def __init__(self) -> None:
super().__init__('LoggerServiceNode', enable_logger_service=True)
self.child_logger = self.get_logger().get_child('child')
self.sub = self.create_subscription(String, 'output', self.callback, 10)
- def callback(self, msg):
+ def callback(self, msg: String) -> None:
self.get_logger().debug(msg.data + ' with DEBUG logger level.')
self.get_logger().info(msg.data + ' with INFO logger level.')
self.get_logger().warning(msg.data + ' with WARN logger level.')
@@ -84,7 +87,7 @@ def callback(self, msg):
class TestNode(Node):
- def __init__(self, remote_node_name):
+ def __init__(self, remote_node_name: str) -> None:
super().__init__('TestNode')
self.pub = self.create_publisher(String, 'output', 10)
self.logger_get_client = self.create_client(
@@ -93,7 +96,9 @@ def __init__(self, remote_node_name):
SetLoggerLevels, remote_node_name + '/set_logger_levels')
self._remote_node_name = remote_node_name
- def set_logger_level_on_remote_node(self, logger_level, logger_name='') -> bool:
+ def set_logger_level_on_remote_node(
+ self, logger_level: int, logger_name: str = '',
+ ) -> bool:
if not self._logger_set_client.service_is_ready():
return False
@@ -101,7 +106,7 @@ def set_logger_level_on_remote_node(self, logger_level, logger_name='') -> bool:
set_logger_level = LoggerLevel()
set_logger_level.name = logger_name if logger_name else self._remote_node_name
set_logger_level.level = logger_level
- request.levels.append(set_logger_level)
+ request.levels.append(set_logger_level) # type: ignore[attr-defined]
future = self._logger_set_client.call_async(request)
rclpy.spin_until_future_complete(self, future)
@@ -117,37 +122,42 @@ def set_logger_level_on_remote_node(self, logger_level, logger_name='') -> bool:
return True
- def get_logger_level_on_remote_node(self, logger_name=''):
+ def get_logger_level_on_remote_node(
+ self, logger_name: str = '',
+ ) -> Union[tuple[Literal[False], None], tuple[Literal[True], int]]:
if not self.logger_get_client.service_is_ready():
- return [False, None]
+ return False, None
request = GetLoggerLevels.Request()
- request.names.append(logger_name if logger_name else self._remote_node_name)
+ name = logger_name if logger_name else self._remote_node_name
+ request.names = [name]
future = self.logger_get_client.call_async(request)
rclpy.spin_until_future_complete(self, future)
ret_results = future.result()
if not ret_results:
- return [False, None]
+ return False, None
- return [True, ret_results.levels[0].level]
+ return True, ret_results.levels[0].level
-def get_logger_level_func(test_node, child_logger_name):
- ret, level = test_node.get_logger_level_on_remote_node()
+def get_logger_level_func(test_node: TestNode, child_logger_name: str) -> None:
+ results = test_node.get_logger_level_on_remote_node()
+ ret, level = results
if ret:
test_node.get_logger().info('Current logger level: ' + str(level))
else:
test_node.get_logger().error('Failed to get logger level via logger service !')
- ret, child_level = test_node.get_logger_level_on_remote_node(child_logger_name)
+ child_results = test_node.get_logger_level_on_remote_node(child_logger_name)
+ ret, child_level = child_results
if ret:
test_node.get_logger().info('Current child logger level: ' + str(child_level))
else:
test_node.get_logger().error('Failed to get child logger level via logger service !')
-def main(args=None):
+def main(args: Union[list[str], None] = None) -> None:
# Check for --service-only flag before ROS 2 consumes the arguments
parser = argparse.ArgumentParser(add_help=False)
parser.add_argument('--service-only', action='store_true', default=False)
diff --git a/demo_nodes_py/demo_nodes_py/parameters/async_param_client.py b/demo_nodes_py/demo_nodes_py/parameters/async_param_client.py
index 7b014a027..5a2f4ccbd 100644
--- a/demo_nodes_py/demo_nodes_py/parameters/async_param_client.py
+++ b/demo_nodes_py/demo_nodes_py/parameters/async_param_client.py
@@ -1,3 +1,4 @@
+#!/usr/bin/env python3
# Copyright 2022 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -23,7 +24,7 @@
from rclpy.parameter_client import AsyncParameterClient
-def main(args=None):
+def main(args: list[str] | None = None) -> None:
try:
with rclpy.init():
node = rclpy.create_node('async_param_client')
@@ -44,63 +45,64 @@ def main(args=None):
rclpy.spin_until_future_complete(node, future)
set_parameters_result = future.result()
if set_parameters_result is not None:
- for i, v in enumerate(set_parameters_result.results):
+ for i, set_param_result in enumerate(set_parameters_result.results):
logger.info(f' {param_list[i]}:')
- logger.info(f' successful: {v.successful}')
+ logger.info(f' successful: {set_param_result.successful}')
else:
logger.error(f'Error setting parameters: {future.exception()}')
logger.info('Listing Parameters:')
- future = client.list_parameters(param_list, 10)
- rclpy.spin_until_future_complete(node, future)
- list_parameters_result = future.result()
+ list_future = client.list_parameters(param_list, 10)
+ rclpy.spin_until_future_complete(node, list_future)
+ list_parameters_result = list_future.result()
if list_parameters_result is not None:
for param_name in list_parameters_result.result.names:
logger.info(f' - {param_name}')
else:
- logger.error(f'Error listing parameters: {future.exception()}')
+ logger.error(f'Error listing parameters: {list_future.exception()}')
logger.info('Getting parameters:')
- future = client.get_parameters(param_list)
- rclpy.spin_until_future_complete(node, future)
- get_parameters_result = future.result()
+ get_future = client.get_parameters(param_list)
+ rclpy.spin_until_future_complete(node, get_future)
+ get_parameters_result = get_future.result()
if get_parameters_result is not None:
- for i, v in enumerate(get_parameters_result.values):
- logger.info(f' - {param_list[i]}: {parameter_value_to_python(v)}')
+ for i, get_param_value in enumerate(get_parameters_result.values):
+ logger.info(
+ f' - {param_list[i]}: {parameter_value_to_python(get_param_value)}')
else:
- logger.error(f'Error getting parameters: {future.exception()}')
+ logger.error(f'Error getting parameters: {get_future.exception()}')
logger.info('Loading parameters: ')
param_dir = get_package_share_directory('demo_nodes_py')
param_file_path = os.path.join(param_dir, 'params.yaml')
- future = client.load_parameter_file(param_file_path)
- rclpy.spin_until_future_complete(node, future)
- load_parameter_results = future.result()
+ load_future = client.load_parameter_file(param_file_path)
+ rclpy.spin_until_future_complete(node, load_future)
+ load_parameter_results = load_future.result()
if load_parameter_results is not None:
param_file_dict = parameter_dict_from_yaml_file(
param_file_path, False, target_nodes=['parameter_blackboard'])
- for i, v in enumerate(param_file_dict.keys()):
- logger.info(f' {v}:')
+ for i, k in enumerate(param_file_dict.keys()):
+ logger.info(f' {k}:')
logger.info(f' successful: '
f'{load_parameter_results.results[i].successful}')
logger.info(f' value: '
- f'{parameter_value_to_python(param_file_dict[v].value)}')
+ f'{parameter_value_to_python(param_file_dict[k].value)}')
else:
- logger.error(f'Error loading parameters: {future.exception()}')
+ logger.error(f'Error loading parameters: {load_future.exception()}')
logger.info('Deleting parameters: ')
params_to_delete = ['other_int_parameter', 'other_string_parameter',
'string_parameter']
- future = client.delete_parameters(params_to_delete)
- rclpy.spin_until_future_complete(node, future)
- delete_parameters_result = future.result()
+ del_future = client.delete_parameters(params_to_delete)
+ rclpy.spin_until_future_complete(node, del_future)
+ delete_parameters_result = del_future.result()
if delete_parameters_result is not None:
- for i, v in enumerate(delete_parameters_result.results):
+ for i, del_param_result in enumerate(delete_parameters_result.results):
logger.info(f' {params_to_delete[i]}:')
- logger.info(f' successful: {v.successful}')
- logger.info(f' reason: {v.reason}')
+ logger.info(f' successful: {del_param_result.successful}')
+ logger.info(f' reason: {del_param_result.reason}')
else:
- logger.error(f'Error deleting parameters: {future.exception()}')
+ logger.error(f'Error deleting parameters: {del_future.exception()}')
except (KeyboardInterrupt, ExternalShutdownException):
pass
diff --git a/demo_nodes_py/demo_nodes_py/parameters/set_parameters_callback.py b/demo_nodes_py/demo_nodes_py/parameters/set_parameters_callback.py
index 884a74721..8e83bef37 100644
--- a/demo_nodes_py/demo_nodes_py/parameters/set_parameters_callback.py
+++ b/demo_nodes_py/demo_nodes_py/parameters/set_parameters_callback.py
@@ -1,3 +1,4 @@
+#!/usr/bin/env python3
# Copyright 2022 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -12,6 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+from typing import Union
+
from rcl_interfaces.msg import SetParametersResult
import rclpy
@@ -26,9 +29,11 @@
# node for demonstrating correct usage of pre_set, on_set
# and post_set parameter callbacks
+
+
class SetParametersCallback(Node):
- def __init__(self):
+ def __init__(self) -> None:
super().__init__('set_parameters_callback')
# tracks 'param1' value
@@ -38,7 +43,9 @@ def __init__(self):
# setting another parameter from the callback is possible
# we expect the callback to be called for param2
- def pre_set_parameter_callback(parameter_list):
+ def pre_set_parameter_callback(
+ parameter_list: list[Parameter], # type: ignore[type-arg]
+ ) -> list[Parameter]: # type: ignore[type-arg]
modified_parameters = parameter_list.copy()
for param in parameter_list:
if param.name == 'param1':
@@ -47,7 +54,9 @@ def pre_set_parameter_callback(parameter_list):
return modified_parameters
# validation callback
- def on_set_parameter_callback(parameter_list):
+ def on_set_parameter_callback(
+ parameter_list: list[Parameter], # type: ignore[type-arg]
+ ) -> SetParametersResult:
result = SetParametersResult()
for param in parameter_list:
if param.name == 'param1':
@@ -60,7 +69,9 @@ def on_set_parameter_callback(parameter_list):
return result
# can change internally tracked class attributes
- def post_set_parameter_callback(parameter_list):
+ def post_set_parameter_callback(
+ parameter_list: list[Parameter], # type: ignore[type-arg]
+ ) -> None:
for param in parameter_list:
if param.name == 'param1':
self.internal_tracked_param_1 = param.value
@@ -72,7 +83,7 @@ def post_set_parameter_callback(parameter_list):
self.add_post_set_parameters_callback(post_set_parameter_callback)
-def main(args=None):
+def main(args: Union[list[str], None] = None) -> None:
try:
with rclpy.init(args=args):
node = SetParametersCallback()
diff --git a/demo_nodes_py/demo_nodes_py/services/add_two_ints_client.py b/demo_nodes_py/demo_nodes_py/services/add_two_ints_client.py
index 07ca330dc..34edabd07 100644
--- a/demo_nodes_py/demo_nodes_py/services/add_two_ints_client.py
+++ b/demo_nodes_py/demo_nodes_py/services/add_two_ints_client.py
@@ -1,3 +1,4 @@
+#!/usr/bin/env python3
# Copyright 2016 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -12,13 +13,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+from typing import Union
+
from example_interfaces.srv import AddTwoInts
import rclpy
from rclpy.executors import ExternalShutdownException
-def main(args=None):
+def main(args: Union[list[str], None] = None) -> None:
try:
with rclpy.init(args=args):
node = rclpy.create_node('add_two_ints_client')
@@ -31,8 +34,10 @@ def main(args=None):
req.b = 3
future = cli.call_async(req)
rclpy.spin_until_future_complete(node, future)
- if future.result() is not None:
- node.get_logger().info('Result of add_two_ints: %d' % future.result().sum)
+ result = future.result()
+ if result is not None:
+ res_sum = result.sum
+ node.get_logger().info('Result of add_two_ints: %d' % res_sum)
else:
node.get_logger().error('Exception while calling service: %r' % future.exception())
except (KeyboardInterrupt, ExternalShutdownException):
diff --git a/demo_nodes_py/demo_nodes_py/services/add_two_ints_client_async.py b/demo_nodes_py/demo_nodes_py/services/add_two_ints_client_async.py
index 8412df405..6d88191b7 100644
--- a/demo_nodes_py/demo_nodes_py/services/add_two_ints_client_async.py
+++ b/demo_nodes_py/demo_nodes_py/services/add_two_ints_client_async.py
@@ -1,3 +1,4 @@
+#!/usr/bin/env python3
# Copyright 2016 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -12,13 +13,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+from typing import Union
+
from example_interfaces.srv import AddTwoInts
import rclpy
from rclpy.executors import ExternalShutdownException
-def main(args=None):
+def main(args: Union[list[str], None] = None) -> None:
try:
with rclpy.init(args=args):
node = rclpy.create_node('add_two_ints_client')
@@ -37,8 +40,10 @@ def main(args=None):
while rclpy.ok():
rclpy.spin_once(node)
if future.done():
- if future.result() is not None:
- logger.info('Result of add_two_ints: %d' % future.result().sum)
+ result = future.result()
+ if result is not None:
+ res_sum = result.sum
+ logger.info('Result of add_two_ints: %d' % res_sum)
else:
logger.error('Exception while calling service: %r' % future.exception())
break
diff --git a/demo_nodes_py/demo_nodes_py/services/add_two_ints_server.py b/demo_nodes_py/demo_nodes_py/services/add_two_ints_server.py
index 92799b666..478a71473 100644
--- a/demo_nodes_py/demo_nodes_py/services/add_two_ints_server.py
+++ b/demo_nodes_py/demo_nodes_py/services/add_two_ints_server.py
@@ -1,3 +1,4 @@
+#!/usr/bin/env python3
# Copyright 2016 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -21,18 +22,23 @@
class AddTwoIntsServer(Node):
- def __init__(self):
+ def __init__(self) -> None:
super().__init__('add_two_ints_server')
- self.srv = self.create_service(AddTwoInts, 'add_two_ints', self.add_two_ints_callback)
-
- def add_two_ints_callback(self, request, response):
+ self.srv = self.create_service(
+ AddTwoInts, 'add_two_ints', self.add_two_ints_callback)
+
+ def add_two_ints_callback(
+ self,
+ request: AddTwoInts.Request,
+ response: AddTwoInts.Response,
+ ) -> AddTwoInts.Response:
response.sum = request.a + request.b
self.get_logger().info('Incoming request\na: %d b: %d' % (request.a, request.b))
return response
-def main(args=None):
+def main(args: list[str] | None = None) -> None:
try:
with rclpy.init(args=args):
node = AddTwoIntsServer()
diff --git a/demo_nodes_py/demo_nodes_py/services/introspection.py b/demo_nodes_py/demo_nodes_py/services/introspection.py
index 4712902ff..a7ddb2321 100644
--- a/demo_nodes_py/demo_nodes_py/services/introspection.py
+++ b/demo_nodes_py/demo_nodes_py/services/introspection.py
@@ -1,3 +1,4 @@
+#!/usr/bin/env python3
# Copyright 2023 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -12,6 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+from typing import Union
+
from example_interfaces.srv import AddTwoInts
from rcl_interfaces.msg import SetParametersResult
@@ -22,6 +25,7 @@
from rclpy.parameter import Parameter
from rclpy.qos import qos_profile_system_default
from rclpy.service_introspection import ServiceIntrospectionState
+from rclpy.task import Future
# This demo program shows how to configure client and service introspection
@@ -69,7 +73,11 @@
# In either case, service introspection data can be seen by running:
# ros2 topic echo /add_two_ints/_service_event
-def check_parameter(parameter_list, parameter_name):
+
+def check_parameter(
+ parameter_list: list[Parameter], # type: ignore[type-arg]
+ parameter_name: str
+) -> SetParametersResult:
result = SetParametersResult()
result.successful = True
for param in parameter_list:
@@ -91,10 +99,16 @@ def check_parameter(parameter_list, parameter_name):
class IntrospectionClientNode(Node):
- def on_set_parameters_callback(self, parameter_list):
+ def on_set_parameters_callback(
+ self,
+ parameter_list: list[Parameter], # type: ignore[type-arg]
+ ) -> SetParametersResult:
return check_parameter(parameter_list, 'client_configure_introspection')
- def on_post_set_parameters_callback(self, parameter_list):
+ def on_post_set_parameters_callback(
+ self,
+ parameter_list: list[Parameter], # type: ignore[type-arg]
+ ) -> None:
for param in parameter_list:
if param.name != 'client_configure_introspection':
continue
@@ -111,7 +125,7 @@ def on_post_set_parameters_callback(self, parameter_list):
introspection_state)
break
- def __init__(self):
+ def __init__(self) -> None:
super().__init__('introspection_client')
self.cli = self.create_client(AddTwoInts, 'add_two_ints')
@@ -121,9 +135,9 @@ def __init__(self):
self.declare_parameter('client_configure_introspection', 'disabled')
self.timer = self.create_timer(0.5, self.timer_callback)
- self.future = None
+ self.future: Union[Future[AddTwoInts.Response], None] = None
- def timer_callback(self):
+ def timer_callback(self) -> None:
if not self.cli.service_is_ready():
return
@@ -139,8 +153,10 @@ def timer_callback(self):
if not self.future.done():
return
- if self.future.result() is not None:
- self.get_logger().info('Result of add_two_ints: %d' % self.future.result().sum)
+ result = self.future.result()
+ if result is not None:
+ res_sum = result.sum
+ self.get_logger().info('Result of add_two_ints: %d' % res_sum)
else:
self.get_logger().error('Exception calling service: %r' % self.future.exception())
@@ -149,10 +165,16 @@ def timer_callback(self):
class IntrospectionServiceNode(Node):
- def on_set_parameters_callback(self, parameter_list):
+ def on_set_parameters_callback(
+ self,
+ parameter_list: list[Parameter], # type: ignore[type-arg]
+ ) -> SetParametersResult:
return check_parameter(parameter_list, 'service_configure_introspection')
- def on_post_set_parameters_callback(self, parameter_list):
+ def on_post_set_parameters_callback(
+ self,
+ parameter_list: list[Parameter], # type: ignore[type-arg]
+ ) -> None:
for param in parameter_list:
if param.name != 'service_configure_introspection':
continue
@@ -169,23 +191,28 @@ def on_post_set_parameters_callback(self, parameter_list):
introspection_state)
break
- def __init__(self):
+ def __init__(self) -> None:
super().__init__('introspection_service')
- self.srv = self.create_service(AddTwoInts, 'add_two_ints', self.add_two_ints_callback)
+ self.srv = self.create_service(
+ AddTwoInts, 'add_two_ints', self.add_two_ints_callback)
self.add_on_set_parameters_callback(self.on_set_parameters_callback)
self.add_post_set_parameters_callback(self.on_post_set_parameters_callback)
self.declare_parameter('service_configure_introspection', 'disabled')
- def add_two_ints_callback(self, request, response):
+ def add_two_ints_callback(
+ self,
+ request: AddTwoInts.Request,
+ response: AddTwoInts.Response,
+ ) -> AddTwoInts.Response:
response.sum = request.a + request.b
self.get_logger().info('Incoming request\na: %d b: %d' % (request.a, request.b))
return response
-def main(args=None):
+def main(args: Union[list[str], None] = None) -> None:
try:
with rclpy.init(args=args):
service_node = IntrospectionServiceNode()
diff --git a/demo_nodes_py/demo_nodes_py/topics/listener.py b/demo_nodes_py/demo_nodes_py/topics/listener.py
index 9f4ea6171..be46db9f9 100644
--- a/demo_nodes_py/demo_nodes_py/topics/listener.py
+++ b/demo_nodes_py/demo_nodes_py/topics/listener.py
@@ -1,3 +1,4 @@
+#!/usr/bin/env python3
# Copyright 2016 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -21,15 +22,16 @@
class Listener(Node):
- def __init__(self):
+ def __init__(self) -> None:
super().__init__('listener')
- self.sub = self.create_subscription(String, 'chatter', self.chatter_callback, 10)
+ self.sub = self.create_subscription(
+ String, 'chatter', self.chatter_callback, 10)
- def chatter_callback(self, msg):
+ def chatter_callback(self, msg: String) -> None:
self.get_logger().info('I heard: [%s]' % msg.data)
-def main(args=None):
+def main(args: list[str] | None = None) -> None:
try:
with rclpy.init(args=args):
node = Listener()
diff --git a/demo_nodes_py/demo_nodes_py/topics/listener_qos.py b/demo_nodes_py/demo_nodes_py/topics/listener_qos.py
index 95e134591..40e7f3802 100644
--- a/demo_nodes_py/demo_nodes_py/topics/listener_qos.py
+++ b/demo_nodes_py/demo_nodes_py/topics/listener_qos.py
@@ -1,3 +1,4 @@
+#!/usr/bin/env python3
# Copyright 2016 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -28,7 +29,7 @@
class ListenerQos(Node):
- def __init__(self, qos_profile):
+ def __init__(self, qos_profile: QoSProfile) -> None:
super().__init__('listener_qos')
if qos_profile.reliability is QoSReliabilityPolicy.RELIABLE:
self.get_logger().info('Reliable listener')
@@ -37,11 +38,11 @@ def __init__(self, qos_profile):
self.sub = self.create_subscription(
String, 'chatter', self.chatter_callback, qos_profile)
- def chatter_callback(self, msg):
+ def chatter_callback(self, msg: String) -> None:
self.get_logger().info('I heard: [%s]' % msg.data)
-def main(argv=sys.argv[1:]):
+def main(argv: list[str] = sys.argv[1:]) -> None:
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument(
'--reliable', dest='reliable', action='store_true',
diff --git a/demo_nodes_py/demo_nodes_py/topics/listener_serialized.py b/demo_nodes_py/demo_nodes_py/topics/listener_serialized.py
index 25d49fc99..09a551cbd 100644
--- a/demo_nodes_py/demo_nodes_py/topics/listener_serialized.py
+++ b/demo_nodes_py/demo_nodes_py/topics/listener_serialized.py
@@ -1,3 +1,4 @@
+#!/usr/bin/env python3
# Copyright 2018 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -12,6 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+from typing import Union
+
from example_interfaces.msg import String
import rclpy
@@ -21,7 +24,7 @@
class SerializedSubscriber(Node):
- def __init__(self):
+ def __init__(self) -> None:
super().__init__('serialized_subscriber')
self.subscription = self.create_subscription(
String,
@@ -32,11 +35,11 @@ def __init__(self):
# We're subscribing to the serialized bytes,
# not example_interfaces.msg.String
- def listener_callback(self, msg):
- self.get_logger().info('I heard: "%s"' % msg)
+ def listener_callback(self, msg: bytes) -> None:
+ self.get_logger().info('I heard: "%r"' % msg)
-def main(args=None):
+def main(args: Union[list[str], None] = None) -> None:
try:
with rclpy.init(args=args):
serialized_subscriber = SerializedSubscriber()
diff --git a/demo_nodes_py/demo_nodes_py/topics/talker.py b/demo_nodes_py/demo_nodes_py/topics/talker.py
index 026b22d10..4f16db25b 100644
--- a/demo_nodes_py/demo_nodes_py/topics/talker.py
+++ b/demo_nodes_py/demo_nodes_py/topics/talker.py
@@ -1,3 +1,4 @@
+#!/usr/bin/env python3
# Copyright 2016 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -12,6 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+from typing import Union
+
from example_interfaces.msg import String
import rclpy
@@ -21,14 +24,14 @@
class Talker(Node):
- def __init__(self):
+ def __init__(self) -> None:
super().__init__('talker')
self.i = 0
self.pub = self.create_publisher(String, 'chatter', 10)
timer_period = 1.0
self.tmr = self.create_timer(timer_period, self.timer_callback)
- def timer_callback(self):
+ def timer_callback(self) -> None:
msg = String()
msg.data = 'Hello World: {0}'.format(self.i)
self.i += 1
@@ -36,7 +39,7 @@ def timer_callback(self):
self.pub.publish(msg)
-def main(args=None):
+def main(args: Union[list[str], None] = None) -> None:
try:
with rclpy.init(args=args):
node = Talker()
diff --git a/demo_nodes_py/demo_nodes_py/topics/talker_qos.py b/demo_nodes_py/demo_nodes_py/topics/talker_qos.py
index 99c695e00..644b51b28 100644
--- a/demo_nodes_py/demo_nodes_py/topics/talker_qos.py
+++ b/demo_nodes_py/demo_nodes_py/topics/talker_qos.py
@@ -1,3 +1,4 @@
+#!/usr/bin/env python3
# Copyright 2016 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -28,9 +29,9 @@
class TalkerQos(Node):
- def __init__(self, qos_profile):
+ def __init__(self, qos_profile: QoSProfile) -> None:
super().__init__('talker_qos')
- self.i = 0
+ self.i: int = 0
if qos_profile.reliability is QoSReliabilityPolicy.RELIABLE:
self.get_logger().info('Reliable talker')
else:
@@ -40,7 +41,7 @@ def __init__(self, qos_profile):
timer_period = 1.0
self.tmr = self.create_timer(timer_period, self.timer_callback)
- def timer_callback(self):
+ def timer_callback(self) -> None:
msg = String()
msg.data = 'Hello World: {0}'.format(self.i)
self.i += 1
@@ -48,7 +49,7 @@ def timer_callback(self):
self.pub.publish(msg)
-def main(argv=sys.argv[1:]):
+def main(argv: list[str] = sys.argv[1:]) -> None:
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument(
'--reliable', dest='reliable', action='store_true',
diff --git a/demo_nodes_py/package.xml b/demo_nodes_py/package.xml
index aa7e3ebde..bc9b4a113 100644
--- a/demo_nodes_py/package.xml
+++ b/demo_nodes_py/package.xml
@@ -26,6 +26,7 @@
ament_copyright
ament_flake8
ament_pep257
+ ament_mypy
ament_xmllint
python3-pytest
diff --git a/demo_nodes_py/test/test_copyright.py b/demo_nodes_py/test/test_copyright.py
index cc8ff03f7..1f8698345 100644
--- a/demo_nodes_py/test/test_copyright.py
+++ b/demo_nodes_py/test/test_copyright.py
@@ -18,6 +18,6 @@
@pytest.mark.copyright
@pytest.mark.linter
-def test_copyright():
+def test_copyright() -> None:
rc = main(argv=['.', 'test'])
assert rc == 0, 'Found errors'
diff --git a/demo_nodes_py/test/test_flake8.py b/demo_nodes_py/test/test_flake8.py
index 27ee1078f..eac16eef9 100644
--- a/demo_nodes_py/test/test_flake8.py
+++ b/demo_nodes_py/test/test_flake8.py
@@ -18,7 +18,7 @@
@pytest.mark.flake8
@pytest.mark.linter
-def test_flake8():
+def test_flake8() -> None:
rc, errors = main_with_errors(argv=[])
assert rc == 0, \
'Found %d code style errors / warnings:\n' % len(errors) + \
diff --git a/demo_nodes_py/test/test_mypy.py b/demo_nodes_py/test/test_mypy.py
new file mode 100644
index 000000000..afc6a332c
--- /dev/null
+++ b/demo_nodes_py/test/test_mypy.py
@@ -0,0 +1,21 @@
+# Copyright 2026 Open Source Robotics Foundation, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from ament_mypy.main import main
+
+
+def test_mypy() -> None:
+ rc = main()
+ assert rc == 0, 'Found code style errors / warnings'
diff --git a/demo_nodes_py/test/test_pep257.py b/demo_nodes_py/test/test_pep257.py
index b234a3840..b6808e1d8 100644
--- a/demo_nodes_py/test/test_pep257.py
+++ b/demo_nodes_py/test/test_pep257.py
@@ -18,6 +18,6 @@
@pytest.mark.linter
@pytest.mark.pep257
-def test_pep257():
+def test_pep257() -> None:
rc = main(argv=['.', 'test'])
assert rc == 0, 'Found code style errors / warnings'