-
Notifications
You must be signed in to change notification settings - Fork 19
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
a10fbb9
commit 4cbf39e
Showing
4 changed files
with
136 additions
and
17 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,108 @@ | ||
import hal | ||
import logging | ||
|
||
logger = logging.getLogger("simple_watchdog") | ||
|
||
__all__ = ["SimpleWatchdog"] | ||
|
||
|
||
class SimpleWatchdog: | ||
"""A class that's a wrapper around a watchdog timer. | ||
When the timer expires, a message is printed to the console and an optional user-provided | ||
callback is invoked. | ||
The watchdog is initialized disabled, so the user needs to call enable() before use. | ||
.. note:: This is a simpler replacement for the :class:`wpilib.Watchdog`, | ||
and should function mostly the same (except that this watchdog will | ||
not detect infinite loops). | ||
.. warning:: This watchdog is not threadsafe | ||
""" | ||
|
||
# Used for timeout print rate-limiting | ||
kMinPrintPeriod = 1000000 # us | ||
|
||
def __init__(self, timeout: float): | ||
"""Watchdog constructor. | ||
:param timeout: The watchdog's timeout in seconds with microsecond resolution. | ||
""" | ||
self._get_time = hal.getFPGATime | ||
|
||
self._startTime = 0 # us | ||
self._timeout = int(timeout * 1e6) # us | ||
self._expirationTime = 0 # us | ||
self._lastTimeoutPrintTime = 0 # us | ||
self._lastEpochsPrintTime = 0 # us | ||
self._epochs = [] | ||
|
||
def getTime(self) -> float: | ||
"""Returns the time in seconds since the watchdog was last fed.""" | ||
return (self._get_time() - self._startTime) / 1e6 | ||
|
||
def setTimeout(self, timeout: float) -> None: | ||
"""Sets the watchdog's timeout. | ||
:param timeout: The watchdog's timeout in seconds with microsecond | ||
resolution. | ||
""" | ||
self._epochs.clear() | ||
timeout = int(timeout * 1e6) # us | ||
self._timeout = timeout | ||
self._startTime = self._get_time() | ||
self._expirationTime = self._startTime + timeout | ||
|
||
def getTimeout(self) -> float: | ||
"""Returns the watchdog's timeout in seconds.""" | ||
return self._timeout / 1e6 | ||
|
||
def isExpired(self) -> bool: | ||
"""Returns true if the watchdog timer has expired.""" | ||
return self._get_time() > self._expirationTime | ||
|
||
def addEpoch(self, epochName: str) -> None: | ||
""" | ||
Adds time since last epoch to the list printed by printEpochs(). | ||
Epochs are a way to partition the time elapsed so that when | ||
overruns occur, one can determine which parts of an operation | ||
consumed the most time. | ||
:param epochName: The name to associate with the epoch. | ||
""" | ||
self._epochs.append((epochName, self._get_time())) | ||
|
||
def printIfExpired(self) -> None: | ||
"""Prints list of epochs added so far and their times.""" | ||
now = self._get_time() | ||
if ( | ||
now > self._expirationTime | ||
and now - self._lastEpochsPrintTime > self.kMinPrintPeriod | ||
): | ||
log = logger.info | ||
self._lastEpochsPrintTime = now | ||
prev = self._startTime | ||
log("Watchdog not fed after %.6fs", (now - prev) / 1e6) | ||
for key, value in self._epochs: | ||
log("\t%s: %.6fs", key, (value - prev) / 1e6) | ||
prev = value | ||
|
||
def reset(self) -> None: | ||
"""Resets the watchdog timer. | ||
This also enables the timer if it was previously disabled. | ||
""" | ||
self.enable() | ||
|
||
def enable(self) -> None: | ||
"""Enables the watchdog timer.""" | ||
self._epochs.clear() | ||
self._startTime = self._get_time() | ||
self._expirationTime = self._startTime + self._timeout | ||
|
||
def disable(self) -> None: | ||
"""Disables the watchdog timer.""" | ||
# .. this doesn't do anything |