/
lircd2uinput
executable file
·260 lines (234 loc) · 8.94 KB
/
lircd2uinput
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
#!/usr/bin/python3
# vim: set fileencoding=utf-8
# Alexander Grothe 2011 - 2016
#
# This script requires python-uinput V 0.6.1. or higher. Additional required
# packages are libudev0 and libudev-dev.
#
# Fetch the code for python-uinput from git: ###
#
# git clone git://github.com/tuomasjjrasanen/python-uinput.git cd python-uinput
# git clone git://github.com/tuomasjjrasanen/libsuinput.git sudo python setup.py
# install
#
# This script must be run as superuser or with sufficent rights to create an
# uinput device and exspects a lircd socket using pid from
# /var/run/lirc/lircd.pid under /var/run/lirc/lircd.<pid of lircd> if none is
# given by --lircd-socket
# /PATH/TO/LIRCD_SOCKET lircd must not be startet with
# --uinput, but may be started with --release="_up" to prevent ghosting events
# if necessary.
import dbus.service
from dbus.mainloop.glib import DBusGMainLoop
import logging
import socket
from gi.repository import GObject
import os
import sys
import time
import uinput
import datetime
from optparse import OptionParser
class Lirc2uinput(uinput.Device):
keydown = False
lastkey = None
events = []
"""Sends keystrokes to a virtual uinput device after applying
a repeat-filter"""
def __init__(self, uinput_name="lircd", options=None):
self.lircd_socket = options.lircd_socket
self.timestamp = datetime.datetime.now()
self.options = options
# add all defined KEY_.* definitions to supported key events
self.events = (key_value for key, key_value in uinput.__dict__.items()
if key.startswith('KEY_'))
# create uinput device
super().__init__(self.events, uinput_name)
def getKeyname(self, key):
try:
keycmd = getattr(uinput, key.upper())
except AttributeError:
keycmd = uinput.KEY_COFFEE
logging.debug(("Keyname %s is not supported by your input.h, "
"get a coffee ;)"), key)
return keycmd
def send_key(self, key, count):
keycmd = self.getKeyname(key)
# logging.debug("send_key: %s, lastkey: %s", keycmd, self.lastkey)
if self.lastkey == keycmd and count > 0:
# logging.debug("repeated key")
self.emit(keycmd, 2, syn=True)
elif self.keydown and count == 0:
self.emit(keycmd, 0, syn=False)
self.emit(keycmd, 1, syn=True)
else:
self.emit(keycmd, 1, syn=True)
self.keydown = True
self.lastkey = keycmd
return keycmd
def release_key(self, keycmd):
if self.keydown:
keycmd = self.getKeyname(keycmd)
self.emit(keycmd, 0, syn=True)
self.lastkey = None
logging.debug('released key %s', keycmd)
self.keydown = False
return False
class SocketConnection:
buf = ""
lastcmd = None
def __init__(self, socket_path, socket_connection, uinputdev, options, r_suffix=""):
"""this function needs the path to a socket, a connected socket instance,
and an instance of Lircd2uinput"""
self.socket_path = socket_path
self.socket = socket_connection
self.uinputdev = uinputdev
self.options = options
self.r_suffix = r_suffix
self.io = GObject.io_add_watch(self.socket, GObject.IO_IN, self.handler)
def still_alive(self):
if not os.path.exists(self.socket_path):
logging.debug("%s does not exist anymore, removing",
self.socket_path)
self.cleanup()
return False
else:
return True
def cleanup(self):
GObject.source_remove(self.io)
self.socket.close()
def handler(self, sock, *args):
self.buf += sock.recv(1024).decode()
if self.buf:
lines = filter(None, self.buf.splitlines(True))
else:
return True # exit if we got nothing
for n, line in enumerate(lines, start=1):
try:
if not line.endswith('\n'):
raise ValueError
code, count, cmd, device = line.split(" ")
except ValueError:
logging.error("not enough values to unpack")
if n == len(lines):
self.buf = line
else:
self.buf = ""
continue
count = int(count, 16)
if (cmd != self.lastcmd
or self.r_suffix and cmd.endswith(self.r_suffix)):
self.release_key(cmd)
self.lastcmd = cmd
logging.debug("got key %s repeat %s", cmd, count)
self.uinputdev.send_key(cmd, count)
self.buf = ""
return True
def release_key(self, cmd):
logging.debug("release key %s", cmd)
self.lastcmd = None
self.uinputdev.release_key(cmd)
return False
class main(dbus.service.Object):
"""Listens to LIRC's domain socket and calls a method each time an
IR command is received."""
socket_list = []
def __init__(self):
parser = Options()
self.options = parser.get_opts()
self.bus = dbus.SystemBus()
bus_name = dbus.service.BusName('de.yavdr.lircd2uinput', bus=self.bus)
dbus.service.Object.__init__(self, bus_name, '/control')
self.uinputdev = Lirc2uinput(options=self.options)
socket_path = self.options.lircd_socket
if socket_path:
try:
logging.debug("init: adding socket %s", socket_path)
logging.debug(self.add_socket(socket_path,
self.options.r_suffix))
except Exception as e:
logging.exception(e)
logging.warn("could not add inital lircd socket")
GObject.timeout_add(1000, self.check_sockets)
def check_sockets(self):
for conn in self.socket_list[:]:
if not conn.still_alive():
logging.debug(("socket %s does not exist anymore, "
"removing from list") % conn.socket_path)
self.socketlist.remove(conn)
@dbus.service.method(
'de.yavdr.lircd2uinput', in_signature='ss', out_signature='bs')
def add_socket(self, socket_path, r_suffix):
if list(filter(lambda x: x.socket_path == socket_path,
self.socket_list)):
return False, "already watching socket %s" % (socket_path)
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
#sock.setblocking(False)
retries = 4
while True:
try:
sock.connect(socket_path)
except Exception as e:
if retries == 0:
logging.error(e)
return False, "Could not connect to {}: {}".format(socket_path, e)
retries -= 1
time.sleep(0.25)
else:
break
conn = SocketConnection(socket_path, sock, self.uinputdev, self.options, r_suffix)
self.socket_list.append(conn)
logging.debug("added connection to %s", socket_path)
return True, "connected to %s" % (socket_path)
@dbus.service.method(
'de.yavdr.lircd2uinput', in_signature='s', out_signature='bs')
def remove_socket(self, socket_path):
for n, conn in enumerate(self.socket_list):
if conn.socket_path == socket_path:
logging.debug("removing socket %s", socket_path)
conn.cleanup()
del(self.socket_list[n])
return True, ("removed socket %s" % (socket_path))
return False, ("no entry for %s" % (socket_path))
@dbus.service.method(
'de.yavdr.lircd2uinput', in_signature='s', out_signature='bs')
def emit_key(self, keyname):
try:
key = getattr(uinput, keyname)
self.uinputdev.emit_click(key)
return True, "pressed {}".format(key)
except AttributeError:
return False, "keyname {} is not supported".format(keyname)
except Exception as e:
return False, e
class Options:
def __init__(self):
self.parser = OptionParser()
self.parser.add_option(
"-s", "--lircd-socket",
dest="lircd_socket",
default=None,
help=u"choose lircd socket to listen on",
metavar="LIRCD_SOCKET",
)
self.parser.add_option(
"-r", "--release-suffix",
dest="r_suffix",
default=" ",
metavar="SUFFIX",
help=('suffix for released keys '
'(see lirc-option "-r --release[=suffix]")'),
)
def get_opts(self):
options, args = self.parser.parse_args()
return options
if __name__ == "__main__":
DBusGMainLoop(set_as_default=True)
logging.basicConfig(level=logging.INFO)
vlirc = main()
loop = GObject.MainLoop()
try:
loop.run()
except KeyboardInterrupt:
loop.quit()
sys.exit()