Skip to content

Commit

Permalink
Add tests for llcp logical data link tco.
Browse files Browse the repository at this point in the history
  • Loading branch information
nehpetsde committed Jun 11, 2017
1 parent ec28feb commit 694a1fa
Showing 1 changed file with 96 additions and 20 deletions.
116 changes: 96 additions & 20 deletions tests/test_llcp_tco.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,6 @@ def HEX(s):
return bytearray.fromhex(s)


def start_thread(target, args):
thread = threading.Thread(target=target, args=args)
thread.start()
return thread


def join_thread(thread):
thread.join(1.0)
assert thread.is_alive() is False


# =============================================================================
# Transmission Control Object
# =============================================================================
Expand Down Expand Up @@ -80,9 +69,8 @@ def test_send(self, tco, pdu):
assert tco.dequeue(1, 0, False) is None
assert tco.dequeue(10, 4, False) == pdu
assert tco.dequeue(10, 4) is None
thread = start_thread(tco.send, (pdu, 0))
assert tco.dequeue(10, 4) == pdu
join_thread(thread)
threading.Timer(0.01, tco.dequeue, (10, 4)).start()
tco.send(pdu, 0)

@pytest.mark.parametrize("pdu", [
nfc.llcp.pdu.UnnumberedInformation(1, 1, HEX('1122')),
Expand All @@ -96,9 +84,8 @@ def test_recv(self, tco, pdu):
assert tco.enqueue(pdu) is False
assert tco.recv() == pdu
assert tco.recv() == pdu
thread = start_thread(tco.recv, ())
assert tco.enqueue(pdu)
join_thread(thread)
threading.Timer(0.01, tco.enqueue, (pdu,)).start()
tco.recv()

def test_poll(self, tco):
pdu = nfc.llcp.pdu.UnnumberedInformation(1, 1, HEX('1122'))
Expand Down Expand Up @@ -162,9 +149,8 @@ def test_send(self, tco):
pdu = nfc.llcp.pdu.UnnumberedInformation(1, 1, HEX('1122'))
assert tco.send(pdu, flags=nfc.llcp.MSG_DONTWAIT) is True
assert tco.dequeue(10, 4) == pdu
thread = start_thread(tco.send, (pdu, 0))
assert tco.dequeue(10, 4) == pdu
join_thread(thread)
threading.Timer(0.01, tco.dequeue, (10, 4)).start()
assert tco.send(pdu, 0) is True
tco.close()
with pytest.raises(nfc.llcp.Error) as excinfo:
tco.send(pdu, 0)
Expand All @@ -181,3 +167,93 @@ def test_recv(self, tco):
with pytest.raises(nfc.llcp.Error) as excinfo:
tco.recv()
assert excinfo.value.errno == errno.ESHUTDOWN


# =============================================================================
# Logical Data Link
# =============================================================================
class TestLogicalDataLink:
@pytest.fixture
def tco(self):
tco = nfc.llcp.tco.LogicalDataLink(128)
assert tco.state.ESTABLISHED is True
assert tco.bind(1) == 1
yield tco
tco.close()
assert tco.state.SHUTDOWN is True

def test_init(self):
tco = nfc.llcp.tco.LogicalDataLink(100)
assert tco.send_miu == 128
assert tco.recv_miu == 100

def test_str(self):
tco = nfc.llcp.tco.LogicalDataLink(128)
assert str(tco) == "LDL None -> None"
tco.bind(1)
assert str(tco) == "LDL 1 -> None"
tco.connect(1)
assert str(tco) == "LDL 1 -> 1"

def test_sockopt(self, tco):
assert tco.getsockopt(nfc.llcp.SO_RCVBUF) == 1
tco.setsockopt(nfc.llcp.SO_RCVBUF, 2)
assert tco.getsockopt(nfc.llcp.SO_RCVBUF) == 2
tco.close()
with pytest.raises(nfc.llcp.Error) as excinfo:
tco.setsockopt(nfc.llcp.SO_RCVBUF, 2)
assert excinfo.value.errno == errno.ESHUTDOWN
with pytest.raises(nfc.llcp.Error) as excinfo:
tco.getsockopt(nfc.llcp.SO_RCVBUF)
assert excinfo.value.errno == errno.ESHUTDOWN

def test_connect(self, tco):
assert tco.connect(1) == 1
tco.close()
with pytest.raises(nfc.llcp.Error) as excinfo:
tco.connect(2)
assert excinfo.value.errno == errno.ESHUTDOWN

def test_poll(self, tco):
assert tco.poll("recv", 0.001) is False
assert tco.poll("send", 0.001) is True
with pytest.raises(nfc.llcp.Error) as excinfo:
tco.poll("invalid", 1)
assert excinfo.value.errno == errno.EINVAL
tco.close()
with pytest.raises(nfc.llcp.Error) as excinfo:
tco.poll("recv", 1)
assert excinfo.value.errno == errno.ESHUTDOWN

def test_sendto(self, tco):
pdu = nfc.llcp.pdu.UnnumberedInformation(1, 1, HEX('1122'))
assert tco.sendto(pdu.data, 1, flags=nfc.llcp.MSG_DONTWAIT) is True
assert tco.dequeue(10, 4) == pdu
assert tco.connect(2) is True
with pytest.raises(nfc.llcp.Error) as excinfo:
tco.sendto(pdu.data, 1, flags=nfc.llcp.MSG_DONTWAIT)
assert excinfo.value.errno == errno.EDESTADDRREQ
with pytest.raises(nfc.llcp.Error) as excinfo:
data = (tco.send_miu + 1) * HEX('11')
tco.sendto(data, 2, flags=nfc.llcp.MSG_DONTWAIT)
assert excinfo.value.errno == errno.EMSGSIZE
tco.close()
with pytest.raises(nfc.llcp.Error) as excinfo:
tco.sendto(pdu.data, 1, 0)
assert excinfo.value.errno == errno.ESHUTDOWN

def test_recvfrom(self, tco):
pdu = nfc.llcp.pdu.Symmetry()
assert tco.enqueue(pdu) is False
pdu = nfc.llcp.pdu.UnnumberedInformation(1, 1, (tco.recv_miu+1) * b'1')
assert tco.enqueue(pdu) is False
pdu = nfc.llcp.pdu.UnnumberedInformation(1, 1, HEX('1122'))
assert tco.enqueue(pdu) is True
assert tco.recvfrom() == (pdu.data, pdu.ssap)
threading.Timer(0.01, tco.close).start()
with pytest.raises(nfc.llcp.Error) as excinfo:
tco.recvfrom()
assert excinfo.value.errno == errno.EPIPE
with pytest.raises(nfc.llcp.Error) as excinfo:
tco.recvfrom()
assert excinfo.value.errno == errno.ESHUTDOWN

0 comments on commit 694a1fa

Please sign in to comment.