Skip to content

Commit

Permalink
Complete udp driver test coverage.
Browse files Browse the repository at this point in the history
  • Loading branch information
nehpetsde committed May 10, 2017
1 parent 29d72c0 commit da5e326
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 17 deletions.
26 changes: 10 additions & 16 deletions src/nfc/clf/udp.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,7 @@ def mute(self):
self.socket = None

def sense_tta(self, target):
if not self.socket:
self._create_socket()
self._create_socket()

log.debug("sense_tta for %s on %s:%d", target, *self.addr)

Expand Down Expand Up @@ -156,8 +155,7 @@ def sense_tta(self, target):
log.debug(error)

def sense_ttb(self, target):
if not self.socket:
self._create_socket()
self._create_socket()

if target.brty not in ("106B", "212B", "424B"):
message = "unsupported bitrate {0}".format(target.brty)
Expand All @@ -178,8 +176,7 @@ def sense_ttb(self, target):
return nfc.clf.RemoteTarget(brty, sensb_res=sensb_res, _addr=addr)

def sense_ttf(self, target):
if not self.socket:
self._create_socket()
self._create_socket()

log.debug("sense_ttf for %s on %s:%d", target, *self.addr)

Expand Down Expand Up @@ -208,8 +205,7 @@ def sense_dep(self, target):
raise nfc.clf.UnsupportedTargetError(info.format(device=self))

def listen_tta(self, target, timeout):
if not self.socket:
self._create_socket()
self._create_socket()

log.debug("listen_tta for %.3f seconds on %s:%d", timeout, *self.addr)

Expand Down Expand Up @@ -291,8 +287,7 @@ def _listen_tta(self, target, time_to_return, init=None):
return target

def listen_ttb(self, target, timeout):
if not self.socket:
self._create_socket()
self._create_socket()

log.debug("listen_ttb for %.3f seconds on %s:%d", timeout, *self.addr)

Expand Down Expand Up @@ -324,8 +319,7 @@ def listen_ttb(self, target, timeout):
tt4_cmd=data, _addr=addr)

def listen_ttf(self, target, timeout):
if not self.socket:
self._create_socket()
self._create_socket()

log.debug("listen_ttf for %.3f seconds on %s:%d", timeout, *self.addr)

Expand Down Expand Up @@ -381,8 +375,7 @@ def _listen_ttf(self, target, time_to_return, init=None):
return target

def listen_dep(self, target, timeout):
if not self.socket:
self._create_socket()
self._create_socket()

log.debug("listen_dep for %.3f seconds on %s:%d", timeout, *self.addr)
assert target.sensf_res is not None
Expand Down Expand Up @@ -511,8 +504,9 @@ def get_max_recv_data_size(self, target):
return 290

def _create_socket(self):
self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.sent_data = self.rcvd_data = 0
if self.socket is None:
self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.sent_data = self.rcvd_data = 0

def _bind_socket(self, time_to_return):
addr = ('0.0.0.0', self.addr[1])
Expand Down
39 changes: 38 additions & 1 deletion tests/test_clf_udp.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,17 @@ def device(mocker):
assert device.addr == ('127.0.0.1', 54321)
device._device_name = "IP-Stack"
device._chipset_name = "UDP"
return device
yield device
device.close()


def test_init(mocker): # noqa: F811
nameinfo = ('127.0.0.1', '54321')
mocker.patch('nfc.clf.udp.select.select').return_value = ([1], [], [])
mocker.patch('nfc.clf.udp.socket.getnameinfo').return_value = nameinfo
mocker.patch('nfc.clf.udp.socket.socket')
device = nfc.clf.udp.init('localhost', 54321)
assert isinstance(device, nfc.clf.udp.Device)


class TestDevice(object):
Expand Down Expand Up @@ -1022,6 +1032,18 @@ def test_listen_dep_timeout_error(self, device):
assert device.listen_dep(target, 0.001) is None
assert device.socket.sendto.mock_calls == []

def test_listen_dep_socket_bind_error(self, device):
device.socket.bind.side_effect \
= nfc.clf.udp.socket.error(nfc.clf.udp.errno.EADDRINUSE, "test")
target = nfc.clf.LocalTarget()
target.sensf_res = HEX('01 01fe303132333435 0f0d23042f7783ff ffff')
target.sens_res = HEX('0000')
target.sdd_res = HEX('01020304')
target.sel_res = HEX('60')
target.atr_res = HEX('D501 d0d1d2d3d4d5d6d7d8d9 0000000800')
assert device.listen_dep(target, 1.0) is None
assert device.socket.sendto.mock_calls == []

#
# SEND/RECV DATA
#
Expand Down Expand Up @@ -1101,3 +1123,18 @@ def test_get_max_send_data_size(self, device):

def test_get_max_recv_data_size(self, device):
assert device.get_max_recv_data_size(None) == 290

#
# INTERNAL METHODS
#

def test_bind_socket(self, device):
assert device._bind_socket(nfc.clf.udp.time.time() - 1) is None
device.socket.bind.side_effect \
= nfc.clf.udp.socket.error(nfc.clf.udp.errno.EADDRINUSE, "test")
assert device._bind_socket(nfc.clf.udp.time.time() + 1) is False
device.socket.bind.side_effect \
= nfc.clf.udp.socket.error(nfc.clf.udp.errno.ENODEV, "test")
with pytest.raises(nfc.clf.udp.socket.error) as excinfo:
device._bind_socket(nfc.clf.udp.time.time() + 1)
assert excinfo.value.errno == nfc.clf.udp.errno.ENODEV

0 comments on commit da5e326

Please sign in to comment.