-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathantimicro-nonX.py
218 lines (184 loc) · 7.57 KB
/
antimicro-nonX.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
#!/usr/bin/python3
from evdev import InputDevice, categorize, ecodes, UInput
import sys
import yaml
import time
import threading
import subprocess
import shlex
import logging
import pprint
from logging.config import dictConfig
# set up logging
logging_config = dict(
version = 1,
formatters = {
'f': {'format':
'%(asctime)s %(levelname)-8s [%(funcName)s:%(lineno)d] %(message)s'}
},
handlers = {
'c': {'class': 'logging.StreamHandler',
'formatter': 'f',
'level': logging.DEBUG,
'stream': "ext://sys.stdout" },
'f': {
'class': 'logging.handlers.RotatingFileHandler',
'formatter': 'f',
'filename': '/var/log/antimicro-nonX.log',
'maxBytes': 1000000,
'backupCount': 4
}
},
root = {
'handlers': ['c','f'],
'level': logging.DEBUG,
},
)
dictConfig(logging_config)
logger = logging.getLogger(__name__)
def pressKeys(keylist):
global ui
logger.info("Pressing key combination: "+ " + ".join(keylist))
for key in keylist:
if key not in dir(ecodes):
logger.error("Key "+key+" is not available for your keyboard, skipping")
continue
ui.write(ecodes.EV_KEY, ecodes.ecodes[key], 1)
ui.syn()
for key in keylist:
if key not in dir(ecodes):
logger.error("Key "+key+" is not available for your keyboard, skipping")
continue
ui.write(ecodes.EV_KEY, ecodes.ecodes[key], 0)
ui.syn()
def runBGShell(command):
logger.info("Executing "+ command)
#split the command into an array
subprocess.run(shlex.split(command), stdout=subprocess.PIPE, universal_newlines=True)
def keySequence(sequence):
for item in sequence:
pressKeys(item)
time.sleep(0.5)
def getKeyName(number):
if number not in ecodes.KEY:
#look it up in ecodes.BTN
if number not in ecodes.BTN:
#must be invalid
return None
else:
return ecodes.BTN[number]
else:
return ecodes.KEY[number]
""" Parse and load the configuration file """
conf = {}
def parseConfig(conffile):
global conf
with open(conffile, 'r') as stream:
try:
conf = yaml.load(stream, Loader=yaml.SafeLoader)
except yaml.YAMLError as exc:
logger.error(exc)
logger.error("Unable to parse configuration file "+conffile)
sys.exit(2)
if len(sys.argv) == 2:
parseConfig(sys.argv[1])
else:
logger.fatal("Usage: "+sys.argv[0]+" /path/to/config.yaml")
sys.exit(1)
ui = None
#pprint.pprint(conf)
# build a dynamc keyboard capability based on what's configured
cap = {
ecodes.EV_KEY : []
}
cap_unique = dict()
for key in conf['key_mapping']:
# validate that it's an understood key code
if key not in dir(ecodes):
logger.error("Key "+key+" is not available for your keyboard, skipping")
continue
cap_unique[ecodes.ecodes[key]] = 1
# add also the keys it needs to emit
if type(conf['key_mapping'][key]) == str:
# ignore, it's just running a command
pass
if type(conf['key_mapping'][key]) == list:
logger.debug("key "+key+" -> list")
if type(conf['key_mapping'][key][0]) == str:
logger.debug("first item is str")
for outkey in conf['key_mapping'][key]:
if outkey not in dir(ecodes):
logger.error("Key "+outkey+" is not available for your keyboard, skipping")
continue
cap_unique[ecodes.ecodes[outkey]] = 1
if type(conf['key_mapping'][key][0]) == list:
logger.debug("first item is list")
for sequence in conf['key_mapping'][key]:
for outkey in sequence:
if outkey not in dir(ecodes):
logger.error("Key "+outkey+" is not available for your keyboard, skipping")
continue
cap_unique[ecodes.ecodes[outkey]] = 1
for key in cap_unique.keys():
cap[ecodes.EV_KEY].append(key)
#pprint.pprint(cap)
# keep trying opening the input device (in case it's intermittent)
while True:
device = None
try:
device = InputDevice(conf['input_device'])
logger.info("Opened "+str(device))
#create the fake input device by cloning the original device
ui = UInput(cap, name='antimicro-nonX-fake-input')
#ui = UInput(name='antimicro-nonX-fake-input')
logger.info("Created input device "+str(ui.device))
except Exception as err:
logger.error(str(err))
time.sleep(2)
continue
# read all events
for event in device.read_loop():
logger.debug("Received event "+str(event))
if event.type == ecodes.EV_KEY:
# for now, ignore all hold down key events (value = 2)
# by default react on key press, not release
value = 1
if(conf['react_on_release'] == True):
value = 0
if(event.value == value):
# we could react to it
# see if the key code was mapped to some action
logger.debug("Checking if "+ str(event.code) + " is mapped")
# skip if not mapped
key_names = getKeyName(event.code)
if key_names == None:
continue
if type(key_names) == str:
#it's just one key. Fake key_names as a list of one
key_name = key_names
key_names = [key_name]
for key_name in key_names:
if key_name in conf['key_mapping']:
logger.info("Handling "+key_name)
# handling depends on yaml definition.
# Is the value just a string? execute it as a command
if type(conf['key_mapping'][key_name]) == str:
logger.info(key_name+ " -> " + conf['key_mapping'][key_name] )
thread = threading.Thread(target=runBGShell, args=[conf['key_mapping'][key_name]])
thread.daemon = True # Daemonize thread
thread.start() # Start the execution
# Is the value an array?
if type(conf['key_mapping'][key_name]) == list:
# Is the first element a string, or a list?
if type(conf['key_mapping'][key_name][0]) == str:
# this is a combination of keys that need to be pressed together
thread = threading.Thread(target=pressKeys, args=[conf['key_mapping'][key_name]])
thread.daemon = True # Daemonize thread
thread.start() # Start the execution
if type(conf['key_mapping'][key_name][0]) == list:
# this is a sequence of keys that need to be sent in succession
logger.info("Sending keys in succession...")
thread = threading.Thread(target=keySequence, args=[conf['key_mapping'][key_name]])
thread.daemon = True # Daemonize thread
thread.start() # Start the execution
# else the key is silently ignored