Skip to content

Commit

Permalink
Merge pull request #202 from stlehmann/fix_testserver
Browse files Browse the repository at this point in the history
Testserver: SUMUP_READ and SUMUP_WRITE with variable length
  • Loading branch information
stlehmann committed Feb 17, 2021
2 parents 0c93dfe + 8c71797 commit b2dd269
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 34 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
* [#195](https://github.com/stlehmann/pyads/pull/195) Read/write by name without passing the datatype

### Changed
* [#202](https://github.com/stlehmann/pyads/pull/202) Testserver now support variable sumread and sumwrite with
variable length for uint8 and string datatypes

### Removed

Expand Down
30 changes: 22 additions & 8 deletions pyads/testserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -492,14 +492,28 @@ def handle_request(self, request):
)

elif index_group == constants.ADSIGRP_SUMUP_READ:
# Could be improved to handle variable length requests
response_value = struct.pack(
"<IIIIBB4sB", 0, 0, 0, 1, 1, 2,
("test" + "\x00").encode("utf-8"), 0
)
n_reads = len(write_data) // 12
fmt = "<" + n_reads * "I"
vals = [0 for i in range(n_reads)]

for i in range(n_reads):
buf = write_data[i * 12 + 8:i * 12 + 12]
is_str = struct.unpack("<I", buf)[0] == 5

if is_str:
fmt += "4s"
vals.append(b"test\x00")
else:
fmt += "B"
vals.append(i + 1)
response_value = struct.pack(fmt, *vals)
print(response_value)

elif index_group == constants.ADSIGRP_SUMUP_WRITE:
response_value = struct.pack("<IIII", 0, 0, 0, 1)
n_writes = len(write_data) // 12
fmt = "<" + n_writes * "I"
vals = n_writes * [0]
response_value = struct.pack(fmt, *vals)

elif response_length > 0:
# Create response of repeated 0x0F with a null terminator for strings
Expand Down Expand Up @@ -817,10 +831,10 @@ def handle_read_state():
# type: () -> bytes
"""Handle read-state request."""
logger.info("Command received: READ_STATE")
ads_state = struct.pack("<I", constants.ADSSTATE_RUN)
ads_state = struct.pack("<H", constants.ADSSTATE_RUN)
# I don't know what an appropriate value for device state is.
# I suspect it may be unused..
device_state = struct.pack("<I", 0)
device_state = struct.pack("<H", 0)
return ads_state + device_state

def handle_writectrl():
Expand Down
52 changes: 26 additions & 26 deletions tests/test_connection_class.py
Original file line number Diff line number Diff line change
Expand Up @@ -1036,7 +1036,7 @@ def test_get_and_release_handle(self):
self.assert_command_id(requests[1], constants.ADSCOMMAND_WRITE)

def test_read_list(self):
variables = ["TestVar1", "TestVar2", "str_TestVar3", "TestVar4"]
variables = ["i1", "i2", "i3", "str_test"]

# Read twice to show caching
with self.plc:
Expand All @@ -1059,18 +1059,18 @@ def test_read_list(self):

# Expected result
expected_result = {
"TestVar1": 1,
"TestVar2": 2,
"str_TestVar3": "test",
"TestVar4": "Internal error",
"i1": 1,
"i2": 2,
"i3": 3,
"str_test": "test",
}
self.assertEqual(read_values, expected_result)
self.assertEqual(read_values2, expected_result)

def test_read_list_without_cache(self):

# Repeat the test without cache
variables = ["TestVar1", "TestVar2", "str_TestVar3", "TestVar4"]
variables = ["i1", "i2", "i3", "str_test"]

with self.plc:
read_values = self.plc.read_list_by_name(variables, cache_symbol_info=False)
Expand All @@ -1090,19 +1090,19 @@ def test_read_list_without_cache(self):

# Expected result
expected_result = {
"TestVar1": 1,
"TestVar2": 2,
"str_TestVar3": "test",
"TestVar4": "Internal error",
"i1": 1,
"i2": 2,
"i3": 3,
"str_test": "test",
}
self.assertEqual(read_values, expected_result)

def test_write_list(self):
variables = {
"TestVar1": 1,
"TestVar2": 2,
"str_TestVar3": "test",
"TestVar4": 3,
"i1": 1,
"i2": 2,
"i3": 3,
"str_test": "test",
}

with self.plc:
Expand All @@ -1123,19 +1123,19 @@ def test_write_list(self):

# Expected result
expected_result = {
"TestVar1": "no error",
"TestVar2": "no error",
"str_TestVar3": "no error",
"TestVar4": "Internal error",
"i1": "no error",
"i2": "no error",
"i3": "no error",
"str_test": "no error",
}
self.assertEqual(errors, expected_result)

def test_write_list_without_cache(self):
variables = {
"TestVar1": 1,
"TestVar2": 2,
"str_TestVar3": "test",
"TestVar4": 3,
"i1": 1,
"i2": 2,
"i3": 3,
"str_test": "test",
}

with self.plc:
Expand All @@ -1156,10 +1156,10 @@ def test_write_list_without_cache(self):

# Expected result
expected_result = {
"TestVar1": "no error",
"TestVar2": "no error",
"str_TestVar3": "no error",
"TestVar4": "Internal error",
"i1": "no error",
"i2": "no error",
"i3": "no error",
"str_test": "no error",
}
self.assertEqual(errors, expected_result)

Expand Down

0 comments on commit b2dd269

Please sign in to comment.