Permalink
3debab2 Jun 20, 2016
1 contributor

Users who have contributed to this file

234 lines (188 sloc) 7.76 KB
#! /usr/bin/env python
#
# This file is part of pySerial - Cross platform serial port support for Python
# (C) 2001-2015 Chris Liechti <cliechti@gmx.net>
#
# SPDX-License-Identifier: BSD-3-Clause
"""\
Some tests for the serial module.
Part of pyserial (http://pyserial.sf.net) (C)2001-2015 cliechti@gmx.net
Intended to be run on different platforms, to ensure portability of
the code.
For all these tests a simple hardware is required.
Loopback HW adapter:
Shortcut these pin pairs:
TX <-> RX
RTS <-> CTS
DTR <-> DSR
On a 9 pole DSUB these are the pins (2-3) (4-6) (7-8)
"""
import unittest
import threading
import time
import sys
import serial
# on which port should the tests be performed:
PORT = 'loop://'
# indirection via bytearray b/c bytes(range(256)) does something else in Pyhton 2.7
bytes_0to255 = bytes(bytearray(range(256)))
def segments(data, size=16):
for a in range(0, len(data), size):
yield data[a:a + size]
class Test4_Nonblocking(unittest.TestCase):
"""Test with timeouts"""
timeout = 0
def setUp(self):
self.s = serial.serial_for_url(PORT, timeout=self.timeout)
def tearDown(self):
self.s.close()
def test0_Messy(self):
"""NonBlocking (timeout=0)"""
# this is only here to write out the message in verbose mode
# because Test3 and Test4 print the same messages
def test1_ReadEmpty(self):
"""timeout: After port open, the input buffer must be empty"""
self.assertEqual(self.s.read(1), b'', "expected empty buffer")
def test2_Loopback(self):
"""timeout: each sent character should return (binary test).
this is also a test for the binary capability of a port."""
for block in segments(bytes_0to255):
length = len(block)
self.s.write(block)
# there might be a small delay until the character is ready (especially on win32)
time.sleep(0.05)
self.assertEqual(self.s.in_waiting, length, "expected exactly {} character for inWainting()".format(length))
self.assertEqual(self.s.read(length), block) #, "expected a %r which was written before" % block)
self.assertEqual(self.s.read(1), b'', "expected empty buffer after all sent chars are read")
def test2_LoopbackTimeout(self):
"""timeout: test the timeout/immediate return.
partial results should be returned."""
self.s.write(b"HELLO")
time.sleep(0.1) # there might be a small delay until the character is ready (especially on win32 and rfc2217)
# read more characters as are available to run in the timeout
self.assertEqual(self.s.read(10), b'HELLO', "expected the 'HELLO' which was written before")
self.assertEqual(self.s.read(1), b'', "expected empty buffer after all sent chars are read")
class Test3_Timeout(Test4_Nonblocking):
"""Same tests as the NonBlocking ones but this time with timeout"""
timeout = 1
def test0_Messy(self):
"""Blocking (timeout=1)"""
# this is only here to write out the message in verbose mode
# because Test3 and Test4 print the same messages
class SendEvent(threading.Thread):
def __init__(self, serial, delay=3):
threading.Thread.__init__(self)
self.serial = serial
self.delay = delay
self.x = threading.Event()
self.stopped = 0
self.start()
def run(self):
time.sleep(self.delay)
self.x.set()
if not self.stopped:
self.serial.write(b"E")
self.serial.flush()
def isSet(self):
return self.x.isSet()
def stop(self):
self.stopped = 1
self.x.wait()
class Test1_Forever(unittest.TestCase):
"""Tests a port with no timeout. These tests require that a
character is sent after some time to stop the test, this is done
through the SendEvent class and the Loopback HW."""
def setUp(self):
self.s = serial.serial_for_url(PORT, timeout=None)
self.event = SendEvent(self.s)
def tearDown(self):
self.event.stop()
self.s.close()
def test2_ReadEmpty(self):
"""no timeout: after port open, the input buffer must be empty (read).
a character is sent after some time to terminate the test (SendEvent)."""
c = self.s.read(1)
if not (self.event.isSet() and c == b'E'):
self.fail("expected marker (evt={!r}, c={!r})".format(self.event.isSet(), c))
class Test2_Forever(unittest.TestCase):
"""Tests a port with no timeout"""
def setUp(self):
self.s = serial.serial_for_url(PORT, timeout=None)
def tearDown(self):
self.s.close()
def test1_inWaitingEmpty(self):
"""no timeout: after port open, the input buffer must be empty (in_waiting)"""
self.assertEqual(self.s.in_waiting, 0, "expected empty buffer")
def test2_Loopback(self):
"""no timeout: each sent character should return (binary test).
this is also a test for the binary capability of a port."""
for block in segments(bytes_0to255):
length = len(block)
self.s.write(block)
# there might be a small delay until the character is ready (especially on win32 and rfc2217)
time.sleep(0.05)
self.assertEqual(self.s.in_waiting, length) #, "expected exactly %d character for inWainting()" % length)
self.assertEqual(self.s.read(length), block) #, "expected %r which was written before" % block)
self.assertEqual(self.s.in_waiting, 0, "expected empty buffer after all sent chars are read")
class Test0_DataWires(unittest.TestCase):
"""Test modem control lines"""
def setUp(self):
self.s = serial.serial_for_url(PORT)
def tearDown(self):
self.s.close()
def test1_RTS(self):
"""Test RTS/CTS"""
self.s.rts = False
time.sleep(1.1)
self.assertTrue(not self.s.cts, "CTS -> 0")
self.s.rts = True
time.sleep(1.1)
self.assertTrue(self.s.cts, "CTS -> 1")
def test2_DTR(self):
"""Test DTR/DSR"""
self.s.dtr = False
time.sleep(1.1)
self.assertTrue(not self.s.dsr, "DSR -> 0")
self.s.dtr = True
time.sleep(1.1)
self.assertTrue(self.s.dsr, "DSR -> 1")
def test3_RI(self):
"""Test RI"""
self.assertTrue(not self.s.ri, "RI -> 0")
class Test_MoreTimeouts(unittest.TestCase):
"""Test with timeouts"""
def setUp(self):
# create an closed serial port
self.s = serial.serial_for_url(PORT, do_not_open=True)
def tearDown(self):
self.s.reset_output_buffer()
self.s.flush()
#~ self.s.write(serial.XON)
self.s.close()
# reopen... some faulty USB-serial adapter make next test fail otherwise...
self.s.timeout = 1
self.s.xonxoff = False
self.s.open()
self.s.read(3000)
self.s.close()
def test_WriteTimeout(self):
"""Test write() timeout."""
# use xonxoff setting and the loop-back adapter to switch traffic on hold
self.s.port = PORT
self.s.write_timeout = 1.0
self.s.xonxoff = True
self.s.open()
self.s.write(serial.XOFF)
time.sleep(0.5) # some systems need a little delay so that they can react on XOFF
t1 = time.time()
self.assertRaises(serial.SerialTimeoutException, self.s.write, b"timeout please" * 200)
t2 = time.time()
self.assertTrue(0.9 <= (t2 - t1) < 2.1, "Timeout not in the given interval ({})".format(t2 - t1))
if __name__ == '__main__':
sys.stdout.write(__doc__)
if len(sys.argv) > 1:
PORT = sys.argv[1]
sys.stdout.write("Testing port: {!r}\n".format(PORT))
sys.argv[1:] = ['-v']
# When this module is executed from the command-line, it runs all its tests
unittest.main()