Skip to content

Commit

Permalink
TimeSource calls callbacks when time jumps
Browse files Browse the repository at this point in the history
callbacks on ROS time activation or deactivation
callbacks on forward time jumps
callbacks on backward time jumps
  • Loading branch information
sloretz committed Aug 10, 2018
1 parent 0db8fc7 commit 956c663
Show file tree
Hide file tree
Showing 4 changed files with 294 additions and 11 deletions.
95 changes: 95 additions & 0 deletions rclpy/rclpy/clock.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

from enum import IntEnum

import weakref

from rclpy.impl.implementation_singleton import rclpy_implementation as _rclpy


Expand All @@ -29,6 +31,61 @@ class ClockType(IntEnum):
STEADY_TIME = 3


class ClockChange(IntEnum):

# The time type before and after the jump is ROS_TIME
ROS_TIME_NO_CHANGE = 1
# The time type switched to ROS_TIME from SYSTEM_TIME
ROS_TIME_ACTIVATED = 2
# The time type switched to SYSTEM_TIME from ROS_TIME
ROS_TIME_DEACTIVATED = 3
# The time type before and after the jump is SYSTEM_TIME
SYSTEM_TIME_NO_CHANGE = 4


class JumpThreshold:

def __init__(self, *, min_forward, min_backward, on_clock_change=True):
"""
Initialize an instance of JumpThreshold.
:param min_forward: Minimum jump forwards to be considered exceeded, or None.
:param min_backward: Negative duration indicating minimum jump backwards to be considered
exceeded, or None.
:param on_clock_change: True to make a callback happen when ROS_TIME is activated
or deactivated.
"""
self.min_forward = min_forward
self.min_backward = min_backward
self.on_clock_change = on_clock_change

def exceeded(self, jump_info):
"""Return true if the time jump should trigger a callback with this threshold."""
clock_type_changes = (ClockChange.ROS_TIME_ACTIVATED, ClockChange.ROS_TIME_DEACTIVATED)
if self.on_clock_change and jump_info.clock_change in clock_type_changes:
return True
elif self.min_forward is not None and jump_info.delta >= self.min_forward:
return True
elif self.min_backward is not None and jump_info.delta <= self.min_backward:
return True
return False


class JumpHandler:

def __init__(self, *, threshold, pre_callback, post_callback):
"""
Initialize an instance of JumpHandler.
:param threshold: Criteria for activating time jump.
:param pre_callback: Callback to be called before new time is set.
:param post_callback: Callback to be called after new time is set.
"""
self.threshold = threshold
self.pre_callback = pre_callback
self.post_callback = post_callback


class Clock:

def __new__(cls, *, clock_type=ClockType.SYSTEM_TIME):
Expand All @@ -40,6 +97,7 @@ def __new__(cls, *, clock_type=ClockType.SYSTEM_TIME):
self = super().__new__(cls)
self._clock_handle = _rclpy.rclpy_create_clock(clock_type)
self._clock_type = clock_type
self._active_jump_handlers = []
return self

@property
Expand All @@ -57,6 +115,43 @@ def now(self):
nanoseconds=_rclpy.rclpy_time_point_get_nanoseconds(time_handle),
clock_type=self.clock_type)

def get_triggered_callback_handlers(self, jump_info):
"""Yield time jump callbacks that would be triggered by the given jump info."""
to_remove = []
for idx, handler_weakref in enumerate(self._active_jump_handlers):
handler = handler_weakref()
if handler is None:
to_remove.append(idx)
elif handler.threshold.exceeded(jump_info):
yield handler
# Remove invalid jump handlers
for idx in reversed(to_remove):
del self._active_jump_handlers[idx]

def create_jump_callback(self, threshold, *, pre_callback=None, post_callback=None):
"""
Create callback handler for clock time jumps.
The callbacks must remain valid as long as the returned JumpHandler is valid.
A callback takes a single argument of an instance of :class:`rclpy.time_source.TimeJump`.
A callback should execute as quick as possible and must not block when called.
:param threshold: Criteria for activating time jump.
:param pre_callback: Callback to be called before new time is set.
:param post_callback: Callback to be called after new time is set.
:rtype: :class:`rclpy.clock.JumpHandler`
"""
if pre_callback is None and post_callback is None:
raise ValueError('One of pre_callback or post_callback must be callable')
if pre_callback is not None and not callable(pre_callback):
raise ValueError('pre_callback must be callable if given')
if post_callback is not None and not callable(post_callback):
raise ValueError('post_callback must be callable if given')
handler = JumpHandler(
threshold=threshold, pre_callback=pre_callback, post_callback=post_callback)
self._active_jump_handlers.append(weakref.ref(handler))
return handler


class ROSClock(Clock):

Expand Down
66 changes: 55 additions & 11 deletions rclpy/rclpy/time_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,41 @@
# limitations under the License.

import builtin_interfaces.msg
from rclpy.clock import ClockChange
from rclpy.clock import ClockType
from rclpy.clock import ROSClock
from rclpy.duration import Duration
from rclpy.time import Time

CLOCK_TOPIC = '/clock'


class TimeJump:

def __init__(self, clock_change, delta):
if not isinstance(clock_change, ClockChange):
raise TypeError('clock_change must be an instance of rclpy.clock.ClockChange')
# Access through read only properties because same instance is given to all clock callbacks
self._clock_change = clock_change
self._delta = delta

@property
def clock_change(self):
return self._clock_change

@property
def delta(self):
return self._delta


class TimeSource:

def __init__(self, *, node=None):
self._clock_sub = None
self._node = None
self._associated_clocks = []
self._last_time_set = None
# Zero time is a special value that means time is uninitialzied
self._last_time_set = Time(clock_type=ClockType.ROS_TIME)
self._ros_time_is_active = False
if node is not None:
self.attach_node(node)
Expand All @@ -36,11 +58,16 @@ def ros_time_is_active(self):

@ros_time_is_active.setter
def ros_time_is_active(self, enabled):
if self._ros_time_is_active == enabled:
return
self._ros_time_is_active = enabled
for clock in self._associated_clocks:
clock._set_ros_time_is_active(enabled)
clock_change = ClockChange.ROS_TIME_DEACTIVATED
if enabled:
clock_change = ClockChange.ROS_TIME_ACTIVATED
self._subscribe_to_clock_topic()
for clock in self._associated_clocks:
# Set time and trigger any time jump callbacks on clock change
self._set_clock(self._last_time_set, clock, TimeJump(clock_change, Duration()))

def _subscribe_to_clock_topic(self):
if self._clock_sub is None and self._node is not None:
Expand Down Expand Up @@ -73,18 +100,35 @@ def detach_node(self):
def attach_clock(self, clock):
if not isinstance(clock, ROSClock):
raise ValueError('Only clocks with type ROS_TIME can be attached.')
if self._last_time_set is not None:
self._set_clock(self._last_time_set, clock)
clock._set_ros_time_is_active(self.ros_time_is_active)

jump_info = None
if self.ros_time_is_active:
jump_info = TimeJump(ClockChange.ROS_TIME_NO_CHANGE, Duration())
else:
jump_info = TimeJump(ClockChange.SYSTEM_TIME_NO_CHANGE, Duration())
self._set_clock(self._last_time_set, clock, jump_info)

self._associated_clocks.append(clock)

def clock_callback(self, msg):
# Cache the last message in case a new clock is attached.
time_from_msg = Time.from_msg(msg)
jump_info = TimeJump(ClockChange.ROS_TIME_NO_CHANGE, time_from_msg - self._last_time_set)
# Cache the last message in case a new clock is attached.
self._last_time_set = time_from_msg
for clock in self._associated_clocks:
self._set_clock(time_from_msg, clock)
# Only notify clocks of new time if ROS time is active
if self.ros_time_is_active:
for clock in self._associated_clocks:
self._set_clock(time_from_msg, clock, jump_info)

def _set_clock(self, time, clock):
# TODO(dhood): clock jump notifications
def _set_clock(self, time, clock, jump_info):
# Must not call pre jump handlers in executor because they are required to complete
# before the clock time changes
jump_handlers = tuple(clock.get_triggered_callback_handlers(jump_info))
for jump_handler in jump_handlers:
if jump_handler.pre_callback is not None:
jump_handler.pre_callback(jump_info)
clock._set_ros_time_is_active(self.ros_time_is_active)
clock.set_ros_time_override(time)
for jump_handler in jump_handlers:
if jump_handler.post_callback is not None:
jump_handler.post_callback(jump_info)
62 changes: 62 additions & 0 deletions rclpy/test/test_clock.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,17 @@
import unittest

from rclpy.clock import Clock
from rclpy.clock import ClockChange
from rclpy.clock import ClockType
from rclpy.clock import JumpThreshold
from rclpy.clock import ROSClock
from rclpy.duration import Duration
from rclpy.time import Time
from rclpy.time_source import TimeJump


def do_nothing(jump_info):
pass


class TestClock(unittest.TestCase):
Expand Down Expand Up @@ -64,3 +72,57 @@ def test_clock_now(self):
now2 = clock.now()
assert now2 > now
now = now2

def test_get_triggered_no_callbacks(self):
jump_info = TimeJump(ClockChange.ROS_TIME_ACTIVATED, Duration())
assert [] == list(ROSClock().get_triggered_callback_handlers(jump_info))

def test_get_triggered_no_callbacks_match_clock_change(self):
jump_info = TimeJump(ClockChange.ROS_TIME_ACTIVATED, Duration())
assert [] == list(ROSClock().get_triggered_callback_handlers(jump_info))

def test_get_triggered_callbacks_time_jump(self):
one_second = Duration(seconds=1)
half_second = Duration(seconds=0.5)
negative_half_second = Duration(seconds=-0.5)
negative_one_second = Duration(seconds=-1)

threshold1 = JumpThreshold(
min_forward=one_second, min_backward=negative_half_second, on_clock_change=False)
threshold2 = JumpThreshold(
min_forward=half_second, min_backward=negative_one_second, on_clock_change=False)

forward_jump_info = TimeJump(ClockChange.ROS_TIME_NO_CHANGE, Duration(seconds=0.75))
backward_jump_info = TimeJump(ClockChange.ROS_TIME_NO_CHANGE, Duration(seconds=-0.75))

clock = ROSClock()
handler1 = clock.create_jump_callback(threshold1, pre_callback=do_nothing)
handler2 = clock.create_jump_callback(threshold2, pre_callback=do_nothing)

assert [handler2] == list(clock.get_triggered_callback_handlers(forward_jump_info))
assert [handler1] == list(clock.get_triggered_callback_handlers(backward_jump_info))

def test_get_triggered_callbacks_clock_change(self):
one_second = Duration(seconds=1)
negative_one_second = Duration(seconds=-1)

threshold1 = JumpThreshold(
min_forward=one_second, min_backward=negative_one_second, on_clock_change=False)
threshold2 = JumpThreshold(min_forward=None, min_backward=None, on_clock_change=True)
threshold3 = JumpThreshold(
min_forward=one_second, min_backward=negative_one_second, on_clock_change=True)

clock_change1 = TimeJump(ClockChange.ROS_TIME_ACTIVATED, Duration())
clock_change2 = TimeJump(ClockChange.ROS_TIME_DEACTIVATED, Duration())
no_change1 = TimeJump(ClockChange.ROS_TIME_NO_CHANGE, Duration())
no_change2 = TimeJump(ClockChange.SYSTEM_TIME_NO_CHANGE, Duration())

clock = ROSClock()
handler1 = clock.create_jump_callback(threshold1, pre_callback=do_nothing) # noqa
handler2 = clock.create_jump_callback(threshold2, pre_callback=do_nothing)
handler3 = clock.create_jump_callback(threshold3, pre_callback=do_nothing)

assert [handler2, handler3] == list(clock.get_triggered_callback_handlers(clock_change1))
assert [handler2, handler3] == list(clock.get_triggered_callback_handlers(clock_change2))
assert [] == list(clock.get_triggered_callback_handlers(no_change1))
assert [] == list(clock.get_triggered_callback_handlers(no_change2))
Loading

0 comments on commit 956c663

Please sign in to comment.