/
linux_kernel.py
executable file
·80 lines (66 loc) · 2.71 KB
/
linux_kernel.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
#!/usr/bin/env python
"""
Backend to support Brother QL-series printers via the linux kernel USB printer interface.
Works on Linux.
"""
import glob, os, time, select
from .generic import BrotherQLBackendGeneric
def list_available_devices():
"""
List all available devices for the linux kernel backend
returns: devices: a list of dictionaries with the keys 'identifier' and 'instance': \
[ {'identifier': 'file:///dev/usb/lp0', 'instance': None}, ] \
Instance is set to None because we don't want to open (and thus potentially block) the device here.
"""
paths = glob.glob('/dev/usb/lp*')
return [{'identifier': 'file://' + path, 'instance': None} for path in paths]
class BrotherQLBackendLinuxKernel(BrotherQLBackendGeneric):
"""
BrotherQL backend using the Linux Kernel USB Printer Device Handles
"""
def __init__(self, device_specifier):
"""
device_specifier: string or os.open(): identifier in the \
format file:///dev/usb/lp0 or os.open() raw device handle.
"""
self.read_timeout = 0.01
# strategy : try_twice or select
self.strategy = 'select'
if isinstance(device_specifier, str):
if device_specifier.startswith('file://'):
device_specifier = device_specifier[7:]
self.dev = os.open(device_specifier, os.O_RDWR)
elif isinstance(device_specifier, int):
self.dev = device_specifier
else:
raise NotImplementedError('Currently the printer can be specified either via an appropriate string or via an os.open() handle.')
self.write_dev = self.dev
self.read_dev = self.dev
def _write(self, data):
os.write(self.write_dev, data)
def _read(self, length=32):
if self.strategy == 'try_twice':
data = os.read(self.read_dev, length)
if data:
return data
else:
time.sleep(self.read_timeout)
return os.read(self.read_dev, length)
elif self.strategy == 'select':
data = b''
start = time.time()
while (not data) and (time.time() - start < self.read_timeout):
result, _, _ = select.select([self.read_dev], [], [], 0)
if self.read_dev in result:
data += os.read(self.read_dev, length)
if data: break
time.sleep(0.001)
if not data:
# one last try if still no data:
return os.read(self.read_dev, length)
else:
return data
else:
raise NotImplementedError('Unknown strategy')
def _dispose(self):
os.close(self.dev)