Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Support Unix Time #42

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions cayennelpp/lpp_frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,11 @@ def add_humitidy(self, channel, value):
hum = LppData(channel, 104, (value, ))
self.data.append(hum)

def add_unix_time(self, channel, value):
"""Create and add a unix time sensor LppData"""
hum = LppData(channel, 133, (value, ))
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

copy paste? hum doesn't fit here, I'd say, better use ts (for timestamp) or similar - even though its only used internally

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The typo has just been addressed within #51.

self.data.append(hum)

def add_accelerometer(self, channel, x, y, z):
"""Create and add a accelerometer sensor LppData"""
acc = LppData(channel, 113, (x, y, z, ))
Expand Down
61 changes: 61 additions & 0 deletions cayennelpp/lpp_type.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
from datetime import datetime
from datetime import timezone as tz
from .utils import datetime_as_utc

try:
import logging
except ImportError:
Expand Down Expand Up @@ -81,6 +85,61 @@ def lpp_analog_io_to_bytes(data):
return buf


def lpp_unix_time_from_bytes(buf):
"""
Convert a 4 byte unsigned integer (unix timestamp) to datetime object.
Assume timezone is utc.
"""
logging.debug("lpp_unix_time_from_bytes")
logging.debug(" in: bytes = %s, length = %d", buf, len(buf))
if not len(buf) == 4:
raise AssertionError()
val_i = ((buf[0] << 24) | (buf[1] << 16) | (buf[2] << 8) | buf[3])
logging.debug(" out: value = %d", val_i)
if val_i >= (1 << 31):
raise ValueError("Unix timestamp can not be negative.")
logging.debug(" out: value = %d", val_i)
val = datetime.fromtimestamp(val_i, tz.utc)
return (val,)


def lpp_unix_time_to_bytes(data):
"""
Convert a datetime object or integer to unsigned 4 byte unix timestamp.
integer. Convert the datetime to utc first.
If it is an integer, assume it already is in utc timezone.
If it is a naive datetime object, assume it is in the system timezone.
"""
logging.debug("lpp_untix_time_to_bytes")
if not isinstance(data, tuple):
data = (data,)
if not len(data) == 1:
raise ValueError("Only one value allowed.")
val = data[0]
logging.debug(" in: value = %s", val)
buf = bytearray([0x00, 0x00, 0x00, 0x00])
epoch = datetime.fromtimestamp(0, tz.utc)
if isinstance(val, datetime):
val = datetime_as_utc(val.replace(microsecond=0))
# val = val.replace(microsecond=0).astimezone(tz.utc)
if val < epoch:
raise ValueError("Date/times before 1970-01-01 08:00 UTC"
"are not allowed")
val = val.timestamp()
logging.debug(" in: value = %f", val)
val_i = int(val)
logging.debug(" in: value = %i", val_i)
if val_i < 0:
raise ValueError("Negative values are not allowed")
logging.debug(" in: value = %d", val_i)
buf[0] = (val_i >> 24) & 0xff
buf[1] = (val_i >> 16) & 0xff
buf[2] = (val_i >> 8) & 0xff
buf[3] = (val_i) & 0xff
logging.debug(" out: bytes = %s, length = %d", buf, len(buf))
return buf


def lpp_illuminance_from_bytes(buf):
"""
Decode illuminance sensor data from CyaenneLPP byte buffer,
Expand Down Expand Up @@ -514,6 +573,8 @@ def __init__(self, tid, name, size, dim, decode, encode):
lpp_accel_from_bytes, lpp_accel_to_bytes),
LppType(115, 'Barometer', 2, 1,
lpp_baro_from_bytes, lpp_baro_to_bytes),
LppType(133, 'Unix Time', 4, 1,
lpp_unix_time_from_bytes, lpp_unix_time_to_bytes),
LppType(134, 'Gyrometer', 6, 3,
lpp_gyro_from_bytes, lpp_gyro_to_bytes),
LppType(136, 'GPS Location', 9, 3,
Expand Down
10 changes: 10 additions & 0 deletions cayennelpp/tests/test_lpp_data.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import pytest
from datetime import datetime
from datetime import timezone as tz

from cayennelpp.lpp_data import LppData

Expand Down Expand Up @@ -26,6 +28,14 @@ def test_gps_from_bytes():
assert gps_buf == gps_dat.bytes()


def test_unix_time_from_bytes():
# 1970-01-01T08:00Z (ie unix time 0)
buff = bytearray([0x01, 0x85, 0x00, 0x00, 0x00, 0x00])
data = LppData.from_bytes(buff)
assert buff == data.bytes()
assert data.value == (datetime.fromtimestamp(0, tz.utc),)


def test_init_invalid_type():
with pytest.raises(Exception):
LppData(0, 4242, 0)
Expand Down
11 changes: 11 additions & 0 deletions cayennelpp/tests/test_lpp_frame.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import pytest
from datetime import datetime
from datetime import timezone as tz

from cayennelpp.lpp_frame import LppFrame

Expand Down Expand Up @@ -64,6 +66,15 @@ def test_add_sensors(frame):
assert len(frame.data) == 7


def test_add_unix_time(frame):
frame.add_unix_time(0, datetime.now(tz.utc))
frame.add_unix_time(1, datetime.fromtimestamp(0))
assert len(frame.data) == 2
assert frame.data[0].type == 133
assert frame.data[1].type == 133
frame.bytes()


def test_add_temperature(frame):
frame.add_temperature(2, 12.3)
frame.add_temperature(3, -32.1)
Expand Down
48 changes: 48 additions & 0 deletions cayennelpp/tests/test_lpp_type.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
import pytest
from cayennelpp.utils import datetime_as_utc
from datetime import datetime, timedelta
from datetime import timezone as tz

from cayennelpp.lpp_type import (lpp_digital_io_to_bytes,
lpp_digital_io_from_bytes,
Expand All @@ -20,6 +23,8 @@
lpp_gyro_from_bytes,
lpp_gps_to_bytes,
lpp_gps_from_bytes,
lpp_unix_time_to_bytes,
lpp_unix_time_from_bytes,
get_lpp_type,
LppType)

Expand Down Expand Up @@ -98,6 +103,49 @@ def test_illuminance_negative_val():
lpp_illuminance_to_bytes((-1,))


def test_unix_time_datetime_without_tz():
now = datetime.now()
utcnow = datetime_as_utc(now.replace(microsecond=0))
vol_buf = lpp_unix_time_to_bytes((now,))
assert lpp_unix_time_from_bytes(vol_buf) == (utcnow,)


def test_unix_time_datetime_with_tz():
now = datetime.now(tz=tz(timedelta(hours=-5)))
utcnow = datetime_as_utc(now.replace(microsecond=0))
vol_buf = lpp_unix_time_to_bytes((now,))
assert lpp_unix_time_from_bytes(vol_buf) == (utcnow,)


def test_unix_time_int():
val = datetime.fromtimestamp(5, tz.utc)
vol_buf = lpp_unix_time_to_bytes((5,))
assert lpp_unix_time_from_bytes(vol_buf) == (val,)


def test_unix_time_invalid_buf():
with pytest.raises(Exception):
lpp_unix_time_from_bytes(bytearray([0x00]))


def test_unix_time_invalid_val():
with pytest.raises(Exception):
lpp_unix_time_to_bytes((0, 1))
val = datetime.fromtimestamp(-5, tz.utc)
with pytest.raises(ValueError):
# negative value
lpp_unix_time_to_bytes((val,))


def test_unix_time_negative_val():
with pytest.raises(Exception):
lpp_unix_time_to_bytes((-1,))
with pytest.raises(Exception):
# -4 years (-4*365*24*3600 seconds)
buff = bytearray([0xf8, 0x7b, 0x32, 0x0])
lpp_unix_time_from_bytes(buff)


def test_presence():
val = (0,)
pre_buf = lpp_presence_to_bytes(val)
Expand Down
11 changes: 11 additions & 0 deletions cayennelpp/tests/test_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from datetime import datetime, timedelta
from datetime import timezone as tz

from cayennelpp.utils import datetime_as_utc


def test_datetime_as_utc():
"""Test converting naive datetime to utc datetime."""
now = datetime.now().replace(microsecond=0, second=0)
utcnow = datetime.now(tz.utc).replace(microsecond=0, second=0)
assert datetime_as_utc(now) - utcnow <= timedelta(seconds=2)
19 changes: 19 additions & 0 deletions cayennelpp/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from datetime import datetime
from datetime import timezone as tz


def datetime_as_utc(dt):
"""Helper function necessary for pyton <=3.5
to convert naive datetime to utc"""
try:
# works with naive datetime object in python 3.6+
return dt.astimezone(tz.utc)
except ValueError:
# necessary in python <= 3.5
pass
if dt.tzinfo is None:
now = datetime.now().replace(microsecond=0, second=0)
utcnow = datetime.utcnow().replace(microsecond=0, second=0)
localtz = tz(now - utcnow)
dt = datetime.fromtimestamp(dt.timestamp(), localtz)
return dt.astimezone(tz.utc)