Permalink
Browse files

serial port enumeration code added

  • Loading branch information...
1 parent 72fb044 commit 9c3045c3483782ea75c0f3a3258dd18933521b8a @stefanix committed Mar 28, 2012
View
16 backend/app.py
@@ -15,6 +15,7 @@
VERSION = "v12.02-beta2"
SERIAL_PORT = None
BITSPERSECOND = 9600
+NETWORK_PORT = 4444
CONFIG_FILE = "lasaurapp.conf"
GUESS_PPREFIX = "tty.usbmodem"
COOKIE_KEY = 'secret_key_jkn23489hsdf'
@@ -36,27 +37,27 @@ def data_root():
-def run_with_callback(host, port=4444, timeout=0.01):
+def run_with_callback(host):
""" Start a wsgiref server instance with control over the main loop.
This is a function that I derived from the bottle.py run()
"""
handler = default_app()
- server = wsgiref.simple_server.make_server(host, port, handler)
- server.timeout = timeout
+ server = wsgiref.simple_server.make_server(host, NETWORK_PORT, handler)
+ server.timeout = 0.01
print "-----------------------------------------------------------------------------"
print "Bottle server starting up ..."
print "Serial is set to %d bps" % BITSPERSECOND
print "Point your browser to: "
- print "http://%s:%d/ (local)" % ('127.0.0.1', port)
+ print "http://%s:%d/ (local)" % ('127.0.0.1', NETWORK_PORT)
if host == '':
- print "http://%s:%d/ (public)" % (socket.gethostbyname(socket.gethostname()), port)
+ print "http://%s:%d/ (public)" % (socket.gethostbyname(socket.gethostname()), NETWORK_PORT)
print "Use Ctrl-C to quit."
print "-----------------------------------------------------------------------------"
print
try:
- webbrowser.open_new_tab('http://127.0.0.1:'+str(port))
+ webbrowser.open_new_tab('http://127.0.0.1:'+str(NETWORK_PORT))
except webbrowser.Error:
- print "Cannot open Webbrowser, please to so manually."
+ print "Cannot open Webbrowser, please do so manually."
while 1:
try:
SerialManager.send_queue_as_ready()
@@ -246,6 +247,7 @@ def queue_pct_done_handler():
else:
run_with_callback('127.0.0.1')
else:
+ SerialManager.list_devices()
print "-----------------------------------------------------------------------------"
print "ERROR: LasaurApp doesn't know what serial device to connect to!"
print "On Linux or OSX this is something like '/dev/tty.usbmodemfd121' and on"
View
99 backend/serial_list_ports.py
@@ -0,0 +1,99 @@
+#!/usr/bin/env python
+
+# portable serial port access with python
+# this is a wrapper module for different platform implementations of the
+# port enumeration feature
+#
+# (C) 2011 Chris Liechti <cliechti@gmx.net>
+# this is distributed under a free software license, see license.txt
+
+"""\
+This module will provide a function called comports that returns an
+iterable (generator or list) that will enumerate available com ports. Note that
+on some systems non-existent ports may be listed.
+
+Additionally a grep function is supplied that can be used to search for ports
+based on their descriptions or hardware ID.
+"""
+
+import sys, os, re
+
+# chose an implementation, depending on os
+#~ if sys.platform == 'cli':
+#~ else:
+import os
+# chose an implementation, depending on os
+if os.name == 'nt': #sys.platform == 'win32':
+ from serial_list_ports_windows import *
+elif os.name == 'posix':
+ from serial_list_ports_posix import *
+#~ elif os.name == 'java':
+else:
+ raise ImportError("Sorry: no implementation for your platform ('%s') available" % (os.name,))
+
+
+def grep(regexp):
+ """\
+ Search for ports using a regular expression. Port name, description and
+ hardware ID are searched. The function returns an iterable that returns the
+ same tuples as comport() would do.
+ """
+ for port, desc, hwid in comports():
+ if re.search(regexp, port, re.I) or re.search(regexp, desc) or re.search(regexp, hwid):
+ yield port, desc, hwid
+
+
+def main():
+ import optparse
+
+ parser = optparse.OptionParser(
+ usage = "%prog [options] [<regexp>]",
+ description = "Miniterm - A simple terminal program for the serial port."
+ )
+
+ parser.add_option("--debug",
+ help="print debug messages and tracebacks (development mode)",
+ dest="debug",
+ default=False,
+ action='store_true')
+
+ parser.add_option("-v", "--verbose",
+ help="show more messages (can be given multiple times)",
+ dest="verbose",
+ default=1,
+ action='count')
+
+ parser.add_option("-q", "--quiet",
+ help="suppress all messages",
+ dest="verbose",
+ action='store_const',
+ const=0)
+
+ (options, args) = parser.parse_args()
+
+
+ hits = 0
+ # get iteraror w/ or w/o filter
+ if args:
+ if len(args) > 1:
+ parser.error('more than one regexp not supported')
+ print "Filtered list with regexp: %r" % (args[0],)
+ iterator = sorted(grep(args[0]))
+ else:
+ iterator = sorted(comports())
+ # list them
+ for port, desc, hwid in iterator:
+ print "%-20s" % (port,)
+ if options.verbose > 1:
+ print " desc: %s" % (desc,)
+ print " hwid: %s" % (hwid,)
+ hits += 1
+ if options.verbose:
+ if hits:
+ print "%d ports found" % (hits,)
+ else:
+ print "no ports found"
+
+# test
+if __name__ == '__main__':
+ main()
View
197 backend/serial_list_ports_posix.py
@@ -0,0 +1,197 @@
+import glob
+import sys
+import os
+import re
+
+try:
+ import subprocess
+except ImportError:
+ def popen(argv):
+ try:
+ si, so = os.popen4(' '.join(argv))
+ return so.read().strip()
+ except:
+ raise IOError('lsusb failed')
+else:
+ def popen(argv):
+ try:
+ return subprocess.check_output(argv, stderr=subprocess.STDOUT).strip()
+ except:
+ raise IOError('lsusb failed')
+
+
+# The comports function is expected to return an iterable that yields tuples of
+# 3 strings: port name, human readable description and a hardware ID.
+#
+# as currently no method is known to get the second two strings easily, they
+# are currently just identical to the port name.
+
+# try to detect the OS so that a device can be selected...
+plat = sys.platform.lower()
+
+def read_line(filename):
+ """help function to read a single line from a file. returns none"""
+ try:
+ f = open(filename)
+ line = f.readline().strip()
+ f.close()
+ return line
+ except IOError:
+ return None
+
+def re_group(regexp, text):
+ """search for regexp in text, return 1st group on match"""
+ m = re.search(regexp, text)
+ if m: return m.group(1)
+
+
+if plat[:5] == 'linux': # Linux (confirmed)
+ # try to extract descriptions from sysfs. this was done by experimenting,
+ # no guarantee that it works for all devices or in the future...
+
+ def usb_sysfs_hw_string(sysfs_path):
+ """given a path to a usb device in sysfs, return a string describing it"""
+ bus, dev = os.path.basename(os.path.realpath(sysfs_path)).split('-')
+ snr = read_line(sysfs_path+'/serial')
+ if snr:
+ snr_txt = ' SNR=%s' % (snr,)
+ else:
+ snr_txt = ''
+ return 'USB VID:PID=%s:%s%s' % (
+ read_line(sysfs_path+'/idVendor'),
+ read_line(sysfs_path+'/idProduct'),
+ snr_txt
+ )
+
+ def usb_lsusb_string(sysfs_path):
+ bus, dev = os.path.basename(os.path.realpath(sysfs_path)).split('-')
+ try:
+ desc = popen(['lsusb', '-v', '-s', '%s:%s' % (bus, dev)])
+ # descriptions from device
+ iManufacturer = re_group('iManufacturer\s+\w+ (.+)', desc)
+ iProduct = re_group('iProduct\s+\w+ (.+)', desc)
+ iSerial = re_group('iSerial\s+\w+ (.+)', desc) or ''
+ # descriptions from kernel
+ idVendor = re_group('idVendor\s+0x\w+ (.+)', desc)
+ idProduct = re_group('idProduct\s+0x\w+ (.+)', desc)
+ # create descriptions. prefer text from device, fall back to the others
+ return '%s %s %s' % (iManufacturer or idVendor, iProduct or idProduct, iSerial)
+ except IOError:
+ return base
+
+ def describe(device):
+ """\
+ Get a human readable description.
+ For USB-Serial devices try to run lsusb to get a human readable description.
+ For USB-CDC devices read the description from sysfs.
+ """
+ base = os.path.basename(device)
+ # USB-Serial devices
+ sys_dev_path = '/sys/class/tty/%s/device/driver/%s' % (base, base)
+ if os.path.exists(sys_dev_path):
+ sys_usb = os.path.dirname(os.path.dirname(os.path.realpath(sys_dev_path)))
+ return usb_lsusb_string(sys_usb)
+ # USB-CDC devices
+ sys_dev_path = '/sys/class/tty/%s/device/interface' % (base,)
+ if os.path.exists(sys_dev_path):
+ return read_line(sys_dev_path)
+ return base
+
+ def hwinfo(device):
+ """Try to get a HW identification using sysfs"""
+ base = os.path.basename(device)
+ if os.path.exists('/sys/class/tty/%s/device' % (base,)):
+ # PCI based devices
+ sys_id_path = '/sys/class/tty/%s/device/id' % (base,)
+ if os.path.exists(sys_id_path):
+ return read_line(sys_id_path)
+ # USB-Serial devices
+ sys_dev_path = '/sys/class/tty/%s/device/driver/%s' % (base, base)
+ if os.path.exists(sys_dev_path):
+ sys_usb = os.path.dirname(os.path.dirname(os.path.realpath(sys_dev_path)))
+ return usb_sysfs_hw_string(sys_usb)
+ # USB-CDC devices
+ if base.startswith('ttyACM'):
+ sys_dev_path = '/sys/class/tty/%s/device' % (base,)
+ if os.path.exists(sys_dev_path):
+ return usb_sysfs_hw_string(sys_dev_path + '/..')
+ return 'n/a' # XXX directly remove these from the list?
+
+ def comports():
+ devices = glob.glob('/dev/ttyS*') + glob.glob('/dev/ttyUSB*') + glob.glob('/dev/ttyACM*')
+ return [(d, describe(d), hwinfo(d)) for d in devices]
+
+elif plat == 'cygwin': # cygwin/win32
+ def comports():
+ devices = glob.glob('/dev/com*')
+ return [(d, d, d) for d in devices]
+
+elif plat == 'openbsd3': # BSD
+ def comports():
+ devices = glob.glob('/dev/ttyp*')
+ return [(d, d, d) for d in devices]
+
+elif plat[:3] == 'bsd' or \
+ plat[:7] == 'freebsd' or \
+ plat[:7] == 'openbsd': # BSD
+
+ def comports():
+ devices = glob.glob('/dev/cuad*')
+ return [(d, d, d) for d in devices]
+
+elif plat[:6] == 'darwin': # OS X (confirmed)
+ def comports():
+ """scan for available ports. return a list of device names."""
+ devices = glob.glob('/dev/tty.*')
+ return [(d, d, d) for d in devices]
+
+elif plat[:6] == 'netbsd': # NetBSD
+ def comports():
+ """scan for available ports. return a list of device names."""
+ devices = glob.glob('/dev/dty*')
+ return [(d, d, d) for d in devices]
+
+elif plat[:4] == 'irix': # IRIX
+ def comports():
+ """scan for available ports. return a list of device names."""
+ devices = glob.glob('/dev/ttyf*')
+ return [(d, d, d) for d in devices]
+
+elif plat[:2] == 'hp': # HP-UX (not tested)
+ def comports():
+ """scan for available ports. return a list of device names."""
+ devices = glob.glob('/dev/tty*p0')
+ return [(d, d, d) for d in devices]
+
+elif plat[:5] == 'sunos': # Solaris/SunOS
+ def comports():
+ """scan for available ports. return a list of device names."""
+ devices = glob.glob('/dev/tty*c')
+ return [(d, d, d) for d in devices]
+
+elif plat[:3] == 'aix': # AIX
+ def comports():
+ """scan for available ports. return a list of device names."""
+ devices = glob.glob('/dev/tty*')
+ return [(d, d, d) for d in devices]
+
+else:
+ # platform detection has failed...
+ sys.stderr.write("""\
+don't know how to enumerate ttys on this system.
+! I you know how the serial ports are named send this information to
+! the author of this module:
+
+sys.platform = %r
+os.name = %r
+pySerial version = %s
+
+also add the naming scheme of the serial ports and with a bit luck you can get
+this module running...
+""" % (sys.platform, os.name, serial.VERSION))
+ raise ImportError("Sorry: no implementation for your platform ('%s') available" % (os.name,))
+
+# test
+if __name__ == '__main__':
+ for port, desc, hwid in sorted(comports()):
+ print "%s: %s [%s]" % (port, desc, hwid)
View
209 backend/serial_list_ports_windows.py
@@ -0,0 +1,209 @@
+import ctypes
+import re
+
+def ValidHandle(value, func, arguments):
+ if value == 0:
+ raise ctypes.WinError()
+ return value
+
+import serial
+from serial.win32 import ULONG_PTR, is_64bit
+from ctypes.wintypes import HANDLE
+from ctypes.wintypes import BOOL
+from ctypes.wintypes import HWND
+from ctypes.wintypes import DWORD
+from ctypes.wintypes import WORD
+from ctypes.wintypes import LONG
+from ctypes.wintypes import ULONG
+from ctypes.wintypes import LPCSTR
+from ctypes.wintypes import HKEY
+from ctypes.wintypes import BYTE
+
+NULL = 0
+HDEVINFO = ctypes.c_void_p
+PCTSTR = ctypes.c_char_p
+CHAR = ctypes.c_char
+LPDWORD = PDWORD = ctypes.POINTER(DWORD)
+#~ LPBYTE = PBYTE = ctypes.POINTER(BYTE)
+LPBYTE = PBYTE = ctypes.c_void_p # XXX avoids error about types
+PHKEY = ctypes.POINTER(HKEY)
+
+ACCESS_MASK = DWORD
+REGSAM = ACCESS_MASK
+
+
+def byte_buffer(length):
+ """Get a buffer for a string"""
+ return (BYTE*length)()
+
+def string(buffer):
+ s = []
+ for c in buffer:
+ if c == 0: break
+ s.append(chr(c & 0xff)) # "& 0xff": hack to convert signed to unsigned
+ return ''.join(s)
+
+
+class GUID(ctypes.Structure):
+ _fields_ = [
+ ('Data1', DWORD),
+ ('Data2', WORD),
+ ('Data3', WORD),
+ ('Data4', BYTE*8),
+ ]
+ def __str__(self):
+ return "{%08x-%04x-%04x-%s-%s}" % (
+ self.Data1,
+ self.Data2,
+ self.Data3,
+ ''.join(["%02x" % d for d in self.Data4[:2]]),
+ ''.join(["%02x" % d for d in self.Data4[2:]]),
+ )
+
+class SP_DEVINFO_DATA(ctypes.Structure):
+ _fields_ = [
+ ('cbSize', DWORD),
+ ('ClassGuid', GUID),
+ ('DevInst', DWORD),
+ ('Reserved', ULONG_PTR),
+ ]
+ def __str__(self):
+ return "ClassGuid:%s DevInst:%s" % (self.ClassGuid, self.DevInst)
+PSP_DEVINFO_DATA = ctypes.POINTER(SP_DEVINFO_DATA)
+
+class SP_DEVICE_INTERFACE_DATA(ctypes.Structure):
+ _fields_ = [
+ ('cbSize', DWORD),
+ ('InterfaceClassGuid', GUID),
+ ('Flags', DWORD),
+ ('Reserved', ULONG_PTR),
+ ]
+ def __str__(self):
+ return "InterfaceClassGuid:%s Flags:%s" % (self.InterfaceClassGuid, self.Flags)
+PSP_DEVICE_INTERFACE_DATA = ctypes.POINTER(SP_DEVICE_INTERFACE_DATA)
+
+PSP_DEVICE_INTERFACE_DETAIL_DATA = ctypes.c_void_p
+
+setupapi = ctypes.windll.LoadLibrary("setupapi")
+SetupDiDestroyDeviceInfoList = setupapi.SetupDiDestroyDeviceInfoList
+SetupDiDestroyDeviceInfoList.argtypes = [HDEVINFO]
+SetupDiDestroyDeviceInfoList.restype = BOOL
+
+SetupDiGetClassDevs = setupapi.SetupDiGetClassDevsA
+SetupDiGetClassDevs.argtypes = [ctypes.POINTER(GUID), PCTSTR, HWND, DWORD]
+SetupDiGetClassDevs.restype = HDEVINFO
+SetupDiGetClassDevs.errcheck = ValidHandle
+
+SetupDiEnumDeviceInterfaces = setupapi.SetupDiEnumDeviceInterfaces
+SetupDiEnumDeviceInterfaces.argtypes = [HDEVINFO, PSP_DEVINFO_DATA, ctypes.POINTER(GUID), DWORD, PSP_DEVICE_INTERFACE_DATA]
+SetupDiEnumDeviceInterfaces.restype = BOOL
+
+SetupDiGetDeviceInterfaceDetail = setupapi.SetupDiGetDeviceInterfaceDetailA
+SetupDiGetDeviceInterfaceDetail.argtypes = [HDEVINFO, PSP_DEVICE_INTERFACE_DATA, PSP_DEVICE_INTERFACE_DETAIL_DATA, DWORD, PDWORD, PSP_DEVINFO_DATA]
+SetupDiGetDeviceInterfaceDetail.restype = BOOL
+
+SetupDiGetDeviceRegistryProperty = setupapi.SetupDiGetDeviceRegistryPropertyA
+SetupDiGetDeviceRegistryProperty.argtypes = [HDEVINFO, PSP_DEVINFO_DATA, DWORD, PDWORD, PBYTE, DWORD, PDWORD]
+SetupDiGetDeviceRegistryProperty.restype = BOOL
+
+SetupDiOpenDevRegKey = setupapi.SetupDiOpenDevRegKey
+SetupDiOpenDevRegKey.argtypes = [HDEVINFO, PSP_DEVINFO_DATA, DWORD, DWORD, DWORD, REGSAM]
+SetupDiOpenDevRegKey.restype = HKEY
+
+advapi32 = ctypes.windll.LoadLibrary("Advapi32")
+RegCloseKey = advapi32.RegCloseKey
+RegCloseKey.argtypes = [HKEY]
+RegCloseKey.restype = LONG
+
+RegQueryValueEx = advapi32.RegQueryValueExA
+RegQueryValueEx.argtypes = [HKEY, LPCSTR, LPDWORD, LPDWORD, LPBYTE, LPDWORD]
+RegQueryValueEx.restype = LONG
+
+
+GUID_CLASS_COMPORT = GUID(0x86e0d1e0L, 0x8089, 0x11d0,
+ (BYTE*8)(0x9c, 0xe4, 0x08, 0x00, 0x3e, 0x30, 0x1f, 0x73))
+
+DIGCF_PRESENT = 2
+DIGCF_DEVICEINTERFACE = 16
+INVALID_HANDLE_VALUE = 0
+ERROR_INSUFFICIENT_BUFFER = 122
+SPDRP_HARDWAREID = 1
+SPDRP_FRIENDLYNAME = 12
+ERROR_NO_MORE_ITEMS = 259
+DICS_FLAG_GLOBAL = 1
+DIREG_DEV = 0x00000001
+KEY_READ = 0x20019
+REG_SZ = 1
+
+# workaround for compatibility between Python 2.x and 3.x
+PortName = serial.to_bytes([80, 111, 114, 116, 78, 97, 109, 101]) # "PortName"
+
+def comports():
+ """This generator scans the device registry for com ports and yields port, desc, hwid"""
+ g_hdi = SetupDiGetClassDevs(ctypes.byref(GUID_CLASS_COMPORT), None, NULL, DIGCF_PRESENT|DIGCF_DEVICEINTERFACE);
+ #~ for i in range(256):
+ for dwIndex in range(256):
+ did = SP_DEVICE_INTERFACE_DATA()
+ did.cbSize = ctypes.sizeof(did)
+
+ if not SetupDiEnumDeviceInterfaces(g_hdi, None, ctypes.byref(GUID_CLASS_COMPORT), dwIndex, ctypes.byref(did)):
+ if ctypes.GetLastError() != ERROR_NO_MORE_ITEMS:
+ raise ctypes.WinError()
+ break
+
+ dwNeeded = DWORD()
+ # get the size
+ if not SetupDiGetDeviceInterfaceDetail(g_hdi, ctypes.byref(did), None, 0, ctypes.byref(dwNeeded), None):
+ # Ignore ERROR_INSUFFICIENT_BUFFER
+ if ctypes.GetLastError() != ERROR_INSUFFICIENT_BUFFER:
+ raise ctypes.WinError()
+ # allocate buffer
+ class SP_DEVICE_INTERFACE_DETAIL_DATA_A(ctypes.Structure):
+ _fields_ = [
+ ('cbSize', DWORD),
+ ('DevicePath', CHAR*(dwNeeded.value - ctypes.sizeof(DWORD))),
+ ]
+ def __str__(self):
+ return "DevicePath:%s" % (self.DevicePath,)
+ idd = SP_DEVICE_INTERFACE_DETAIL_DATA_A()
+ if is_64bit():
+ idd.cbSize = 8
+ else:
+ idd.cbSize = 5
+ devinfo = SP_DEVINFO_DATA()
+ devinfo.cbSize = ctypes.sizeof(devinfo)
+ if not SetupDiGetDeviceInterfaceDetail(g_hdi, ctypes.byref(did), ctypes.byref(idd), dwNeeded, None, ctypes.byref(devinfo)):
+ raise ctypes.WinError()
+
+ # hardware ID
+ szHardwareID = byte_buffer(250)
+ if not SetupDiGetDeviceRegistryProperty(g_hdi, ctypes.byref(devinfo), SPDRP_HARDWAREID, None, ctypes.byref(szHardwareID), ctypes.sizeof(szHardwareID) - 1, None):
+ # Ignore ERROR_INSUFFICIENT_BUFFER
+ if GetLastError() != ERROR_INSUFFICIENT_BUFFER:
+ raise ctypes.WinError()
+
+ # friendly name
+ szFriendlyName = byte_buffer(250)
+ if not SetupDiGetDeviceRegistryProperty(g_hdi, ctypes.byref(devinfo), SPDRP_FRIENDLYNAME, None, ctypes.byref(szFriendlyName), ctypes.sizeof(szFriendlyName) - 1, None):
+ # Ignore ERROR_INSUFFICIENT_BUFFER
+ if ctypes.GetLastError() != ERROR_INSUFFICIENT_BUFFER:
+ #~ raise IOError("failed to get details for %s (%s)" % (devinfo, szHardwareID.value))
+ port_name = None
+ else:
+ # the real com port name has to read differently...
+ hkey = SetupDiOpenDevRegKey(g_hdi, ctypes.byref(devinfo), DICS_FLAG_GLOBAL, 0, DIREG_DEV, KEY_READ)
+ port_name_buffer = byte_buffer(250)
+ port_name_length = ULONG(ctypes.sizeof(port_name_buffer))
+ RegQueryValueEx(hkey, PortName, None, None, ctypes.byref(port_name_buffer), ctypes.byref(port_name_length))
+ RegCloseKey(hkey)
+ yield string(port_name_buffer), string(szFriendlyName), string(szHardwareID)
+
+ SetupDiDestroyDeviceInfoList(g_hdi)
+
+
+# test
+if __name__ == '__main__':
+ import serial
+
+ for port, desc, hwid in sorted(comports()):
+ print "%s: %s [%s]" % (port, desc, hwid)
View
8 backend/serial_manager.py
@@ -3,7 +3,7 @@
import time
import serial
from collections import deque
-
+from serial_list_ports import grep
class SerialManagerClass:
@@ -31,6 +31,12 @@ def __init__(self):
self.job_active = False
+ def list_devices(self):
+ iterator = sorted(grep('tty'))
+ for port, desc, hwid in iterator:
+ print "%-20s" % (port,)
+ print " desc: %s" % (desc,)
+ print " hwid: %s" % (hwid,)
def connect(self, port, baudrate):
self.rx_buffer = ""

0 comments on commit 9c3045c

Please sign in to comment.