forked from selfspy/selfspy
/
sniff_x.py
116 lines (95 loc) · 4.08 KB
/
sniff_x.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# This file is loosely based on examples/record_demo.py in python-xlib
import sys
import os
import re
import time
import threading
from Xlib import X, XK, display
from Xlib.ext import record
from Xlib.protocol import rq
def state_to_idx(state): #this could be a dict, but I might want to extend it.
if state == 1: return 1
if state == 128: return 4
if state == 129: return 5
return 0
class SniffX:
def __init__(self):
self.keysymdict = {}
for name in dir(XK):
if name.startswith("XK_"):
self.keysymdict[getattr(XK, name)] = name[3:]
self.key_hook = lambda x: True
self.mouse_button_hook = lambda x: True
self.mouse_move_hook = lambda x: True
self.contextEventMask = [X.KeyPress, X.MotionNotify] #X.MappingNotify?
self.the_display = display.Display()
self.record_display = display.Display()
self.keymap = self.the_display._keymap_codes
def run(self):
# Check if the extension is present
if not self.record_display.has_extension("RECORD"):
print "RECORD extension not found"
sys.exit(1)
else:
print "RECORD extension present"
# Create a recording context; we only want key and mouse events
self.ctx = self.record_display.record_create_context(
0,
[record.AllClients],
[{
'core_requests': (0, 0),
'core_replies': (0, 0),
'ext_requests': (0, 0, 0, 0),
'ext_replies': (0, 0, 0, 0),
'delivered_events': (0, 0),
'device_events': tuple(self.contextEventMask),
'errors': (0, 0),
'client_started': False,
'client_died': False,
}])
# Enable the context; this only returns after a call to record_disable_context,
# while calling the callback function in the meantime
self.record_display.record_enable_context(self.ctx, self.processevents)
# Finally free the context
self.record_display.record_free_context(self.ctx)
def cancel(self):
self.the_display.record_disable_context(self.ctx)
self.the_display.flush()
def processevents(self, reply):
if reply.category != record.FromServer:
return
if reply.client_swapped:
print "* received swapped protocol data, cowardly ignored"
return
if not len(reply.data) or ord(reply.data[0]) < 2:
# not an event
return
data = reply.data
while len(data):
event, data = rq.EventField(None).parse_binary_value(data, self.record_display.display, None, None)
if event.type in [X.KeyPress, X.KeyRelease]:
self.key_hook(*self.key_event(event))
elif event.type in [X.ButtonPress, X.ButtonRelease]:
self.mouse_button_hook(*self.button_event(event))
elif event.type == X.MotionNotify:
self.mouse_move_hook(event.root_x, event.root_y)
elif event.type == X.MappingNotify:
self.the_display.refresh_keyboard_mapping()
newkeymap = self.the_display._keymap_codes
print 'Change keymap!', newkeymap == self.keymap
self.keymap = newkeymap
def get_key_name(self, keycode, state):
state_idx = state_to_idx(state)
cn = self.keymap[keycode][state_idx]
if cn < 256:
return chr(cn).decode('latin1')#.encode('utf8')
else:
return self.lookup_keysym(cn)
def key_event(self, event):
return event.detail, event.state, self.get_key_name(event.detail, event.state), event.type == X.KeyPress
def button_event(self, event):
return event.detail, event.type == X.ButtonPress
def lookup_keysym(self, keysym):
if keysym in self.keysymdict:
return self.keysymdict[keysym]
return "[%d]" % keysym