Skip to content

Commit c0a4ae5

Browse files
sveinseacolomb
andauthored
Correctly apply offset to custom timestamp value (fixes #563) (#576)
Co-authored-by: André Colomb <src@andre.colomb.de>
1 parent 09c1ddd commit c0a4ae5

File tree

2 files changed

+30
-3
lines changed

2 files changed

+30
-3
lines changed

canopen/timestamp.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ def transmit(self, timestamp: Optional[float] = None):
3131
:param float timestamp:
3232
Optional Unix timestamp to use, otherwise the current time is used.
3333
"""
34-
delta = timestamp or time.time() - OFFSET
34+
delta = (timestamp or time.time()) - OFFSET
3535
days, seconds = divmod(delta, ONE_DAY)
3636
data = TIME_OF_DAY_STRUCT.pack(int(seconds * 1000), int(days))
3737
self.network.send_message(self.cob_id, data)

test/test_time.py

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,49 @@
1+
import struct
2+
import time
13
import unittest
4+
from datetime import datetime
5+
from unittest.mock import patch
26

37
import canopen
8+
import canopen.timestamp
49

510

611
class TestTime(unittest.TestCase):
712

13+
def test_epoch(self):
14+
"""Verify that the epoch matches the standard definition."""
15+
epoch = datetime.strptime(
16+
"1984-01-01 00:00:00 +0000", "%Y-%m-%d %H:%M:%S %z"
17+
).timestamp()
18+
self.assertEqual(int(epoch), canopen.timestamp.OFFSET)
19+
820
def test_time_producer(self):
921
network = canopen.Network()
1022
network.NOTIFIER_SHUTDOWN_TIMEOUT = 0.0
1123
network.connect(interface="virtual", receive_own_messages=True)
1224
producer = canopen.timestamp.TimeProducer(network)
13-
producer.transmit(1486236238)
25+
26+
# Provide a specific time to verify the proper encoding
27+
producer.transmit(1_927_999_438) # 2031-02-04T19:23:58+00:00
1428
msg = network.bus.recv(1)
15-
network.disconnect()
1629
self.assertEqual(msg.arbitration_id, 0x100)
1730
self.assertEqual(msg.dlc, 6)
1831
self.assertEqual(msg.data, b"\xb0\xa4\x29\x04\x31\x43")
1932

33+
# Test again with the current time as implicit timestamp
34+
current = time.time()
35+
with patch("canopen.timestamp.time.time", return_value=current):
36+
current_from_epoch = current - canopen.timestamp.OFFSET
37+
producer.transmit()
38+
msg = network.bus.recv(1)
39+
self.assertEqual(msg.arbitration_id, 0x100)
40+
self.assertEqual(msg.dlc, 6)
41+
ms, days = struct.unpack("<LH", msg.data)
42+
self.assertEqual(days, int(current_from_epoch) // 86400)
43+
self.assertEqual(ms, int(current_from_epoch % 86400 * 1000))
44+
45+
network.disconnect()
46+
2047

2148
if __name__ == "__main__":
2249
unittest.main()

0 commit comments

Comments
 (0)