Skip to content

Commit

Permalink
Added tests for data readouts
Browse files Browse the repository at this point in the history
  • Loading branch information
Krolken committed May 24, 2019
1 parent 0eb4c8a commit 5f39c94
Show file tree
Hide file tree
Showing 2 changed files with 118 additions and 91 deletions.
63 changes: 0 additions & 63 deletions iec62056_21/messages.py
Expand Up @@ -13,57 +13,6 @@
regex_data_just_value = re.compile(r"^\((.*)\)")


def validate_max_length(input, max_length, exclude_chars=None):
"""
Will validate that max length.
:param input:
:param max_length:
:param exclude_chars:
"""
if input:
_input = input
if exclude_chars:
for char in exclude_chars:
_input.strip(char)

if len(_input) > max_length:
raise ValueError(f"{input!r} is longer than max chars: {max_length})")


def validate_value_length(data):
"""
Will validate value lenght.
:param data:
"""
max_length = 32
exclude_chars = ["(", ")", "/", "!"]
validate_max_length(data, max_length, exclude_chars=exclude_chars)


def validate_address_length(data):
"""
Will validate correct address lenght.
:param data:
"""
max_length = 16
exclude_chars = ["(", ")", "/", "!"]
validate_max_length(data, max_length, exclude_chars=exclude_chars)


def validate_unit_length(data):
"""
Will validate correct unit lenght.
:param data:
"""
max_length = 16
exclude_chars = ["(", ")", "/", "!"]
validate_max_length(data, max_length, exclude_chars=exclude_chars)


class Iec6205621Data:
"""
Base class for IEC 62056-21 messages.
Expand Down Expand Up @@ -109,18 +58,6 @@ def __init__(self, value, address=None, unit=None, no_address=False):
self.value = value
self.unit = unit

if not self.is_valid():
raise ValidationError("Input to DataSet is not valid")

def is_valid(self):
try:
validate_address_length(self.address)
validate_value_length(self.value)
validate_unit_length(self.unit)
return True
except ValueError:
return False

def to_representation(self):
if self.unit:
return f"{self.address}({self.value}*{self.unit})"
Expand Down
146 changes: 118 additions & 28 deletions tests/test_data.py
@@ -1,77 +1,83 @@
import pytest

from iec62056_21.messages import DataSet, DataLine, CommandMessage, AnswerDataMessage
from iec62056_21.exceptions import Iec6205621ParseError, ValidationError
from iec62056_21 import messages, exceptions, constants


class TestDataSetWithUnit:
class TestDataSets:

data_set_with_unit = "3.1.0(100*kWh)"
data_set_without_unit = "3.1.0(100)"
not_valid_data = '"Tralalalala'

def test_from_string_with_unit(self):
ds = DataSet.from_representation(self.data_set_with_unit)
ds = messages.DataSet.from_representation(self.data_set_with_unit)
assert ds.value == "100"
assert ds.address == "3.1.0"
assert ds.unit == "kWh"

def test_from_bytes_with_unit(self):
ds_bytes = self.data_set_with_unit.encode("latin-1")
ds = DataSet.from_representation(self.data_set_with_unit)
ds = messages.DataSet.from_representation(self.data_set_with_unit)
assert ds.value == "100"
assert ds.address == "3.1.0"
assert ds.unit == "kWh"

def test_to_string_with_unit(self):
ds = messages.DataSet(value="100", address="3.1.0", unit="kWh")
assert ds.to_representation() == self.data_set_with_unit

def test_to_byte_with_unit(self):
ds = messages.DataSet(value="100", address="3.1.0", unit="kWh")
assert ds.to_bytes() == self.data_set_with_unit.encode(constants.ENCODING)

def test_from_string_without_unit(self):
ds = DataSet.from_representation(self.data_set_without_unit)
ds = messages.DataSet.from_representation(self.data_set_without_unit)
assert ds.value == "100"
assert ds.address == "3.1.0"
assert ds.unit == None

def test_from_bytes_without_unit(self):
ds_bytes = self.data_set_without_unit.encode("latin-1")
ds = DataSet.from_representation(self.data_set_without_unit)
ds = messages.DataSet.from_representation(self.data_set_without_unit)
assert ds.value == "100"
assert ds.address == "3.1.0"
assert ds.unit == None

def test_invalid_data(self):
with pytest.raises(Iec6205621ParseError):
ds = DataSet.from_representation(self.not_valid_data)
def test_to_string_without_unit(self):
ds = messages.DataSet(value="100", address="3.1.0", unit=None)
assert ds.to_representation() == self.data_set_without_unit

def test_max_length_address(self):
data_set = "12345678901234567(43*kWh)"
with pytest.raises(ValidationError):
ds = DataSet.from_representation(data_set)
def test_to_byte_without_unit(self):
ds = messages.DataSet(value="100", address="3.1.0", unit=None)
assert ds.to_bytes() == self.data_set_without_unit.encode(constants.ENCODING)

def test_invalid_data(self):
with pytest.raises(exceptions.Iec6205621ParseError):
ds = messages.DataSet.from_representation(self.not_valid_data)

def test_max_length_value(self):
data_set = "3:0:0(123456789012345678901234567890123*kWh)"
with pytest.raises(ValidationError):
ds = DataSet.from_representation(data_set)

def test_max_length_unit(self):
data_set = "3:0:0(43*veryverylonglonglonglongunit)"
with pytest.raises(ValidationError):
ds = DataSet.from_representation(data_set)
class TestBase:
def test_to_bytes(self):
with pytest.raises(NotImplementedError):
messages.Iec6205621Data().to_bytes()


class TestDataLine:
def test_from_representation(self):
string_data = "12(12*kWh)13(13*kWh)14(14*kwh)"

dl = DataLine.from_representation(string_data)
dl = messages.DataLine.from_representation(string_data)

assert len(dl.data_sets) == 3
assert dl.data_sets[0].value == "12"
assert dl.data_sets[0].address == "12"
assert dl.data_sets[0].unit == "kWh"

def test_to_representation(self):
dl = DataLine(
dl = messages.DataLine(
data_sets=[
DataSet(address="3:14", value="314", unit="kWh"),
DataSet(address="4:15", value="415", unit="kWh"),
messages.DataSet(address="3:14", value="314", unit="kWh"),
messages.DataSet(address="4:15", value="415", unit="kWh"),
]
)

Expand All @@ -80,10 +86,74 @@ def test_to_representation(self):
assert rep == "3:14(314*kWh)4:15(415*kWh)"


class TestDataBlock:
def test_from_representation_single_line(self):
string_data = "12(12*kWh)13(13*kWh)14(14*kwh)\r\n"

db = messages.DataBlock.from_representation(string_data)

assert len(db.data_lines) == 1

def test_from_representation_several_lines(self):
string_data = (
"12(12*kWh)13(13*kWh)14(14*kwh)\r\n"
"12(12*kWh)13(13*kWh)14(14*kwh)\r\n"
"12(12*kWh)13(13*kWh)14(14*kwh)\r\n"
)
db = messages.DataBlock.from_representation(string_data)

assert len(db.data_lines) == 3

def test_to_representation_several_lines(self):

db = messages.DataBlock(
data_lines=[
messages.DataLine(
data_sets=[
messages.DataSet(address="3:14", value="314", unit="kWh"),
messages.DataSet(address="4:15", value="415", unit="kWh"),
]
),
messages.DataLine(
data_sets=[
messages.DataSet(address="3:14", value="314", unit="kWh"),
messages.DataSet(address="4:15", value="415", unit="kWh"),
]
),
messages.DataLine(
data_sets=[
messages.DataSet(address="3:14", value="314", unit="kWh"),
messages.DataSet(address="4:15", value="415", unit="kWh"),
]
),
]
)

assert db.to_representation() == (
"3:14(314*kWh)4:15(415*kWh)\r\n"
"3:14(314*kWh)4:15(415*kWh)\r\n"
"3:14(314*kWh)4:15(415*kWh)\r\n"
)

def test_to_representation_single_line(self):
db = messages.DataBlock(
data_lines=[
messages.DataLine(
data_sets=[
messages.DataSet(address="3:14", value="314", unit="kWh"),
messages.DataSet(address="4:15", value="415", unit="kWh"),
]
)
]
)

assert db.to_representation() == "3:14(314*kWh)4:15(415*kWh)\r\n"


class TestCommandMessage:
def test_parse_p0_messge(self):
data = b"\x01P0\x02(1234567)\x03P"
cm = CommandMessage.from_bytes(data)
cm = messages.CommandMessage.from_bytes(data)

assert cm.command == "P"
assert cm.command_type == 0
Expand All @@ -96,7 +166,27 @@ class TestAnswerDataMessage:
def test_parse_read_response(self):
data = b"\x023:171.0(0)\x03\x12"

am = AnswerDataMessage.from_bytes(data)
am = messages.AnswerDataMessage.from_bytes(data)

assert am.data_block.data_lines[0].data_sets[0].value == "0"
assert am.data_block.data_lines[0].data_sets[0].address == "3:171.0"
assert am.data[0].value == "0"
assert am.data[0].address == "3:171.0"


class TestReadoutDataMessage:
def test_to_representation(self):
rdm = messages.ReadoutDataMessage(
data_block=messages.DataBlock(
data_lines=[
messages.DataLine(
data_sets=[
messages.DataSet(address="3:14", value="314", unit="kWh"),
messages.DataSet(address="4:15", value="415", unit="kWh"),
]
)
]
)
)

assert rdm.to_representation() == '\x023:14(314*kWh)4:15(415*kWh)\r\n!\r\n\x03"'

0 comments on commit 5f39c94

Please sign in to comment.