Skip to content

Commit

Permalink
netlink: u32 filter for I/O core
Browse files Browse the repository at this point in the history
+ subscribe
  • Loading branch information
svinota committed Dec 7, 2013
1 parent 198163b commit 3163926
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 11 deletions.
70 changes: 60 additions & 10 deletions pyroute2/netlink/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ def _monkey_handshake(self):
IPRCMD_DISCONNECT = 8
IPRCMD_SERVE = 9
IPRCMD_SHUTDOWN = 10
IPRCMD_SUBSCRIBE = 11
IPRCMD_UNSUBSCRIBE = 12


def _get_plugin(url):
Expand Down Expand Up @@ -435,6 +437,8 @@ def __init__(self,
self.servers = set() # set(socket, socket...)
self.controls = set() # set(socket, socket...)
self.ssl_keys = {} # {url: ssl_credentials(), url:...}
self.subscribe = {}
self._cids = list(range(1024))
# secret; write non-zero byte as terminator
self.secret = os.urandom(15)
self.secret += b'\xff'
Expand All @@ -451,6 +455,15 @@ def __init__(self,
#self._expire_thread.setDaemon(True)
self._expire_thread.start()

def alloc_cid(self):
try:
return self._cids.pop()
except IndexError:
return None

def dealloc_cid(self, cid):
self._cids.append(cid)

def alloc_addr(self, system, block=False):
with self._addr_lock:
if system not in self.active_sys:
Expand Down Expand Up @@ -625,13 +638,27 @@ def route_control(self, sock, data):
except Exception as e:
rsp['attrs'].append(['IPR_ATTR_ERROR',
traceback.format_exc()])
elif sock in self.clients:
#elif cmd['cmd'] == IPRCMD_SUBSCRIBE:
# offset = cmd.get_attr('IPR_ATTR_OFFSET')
# pattern = cmd.get_attr('IPR_ATTR_CDATA')
# self.subscribe[offset, pattern] = socket
if sock in self.clients:
if cmd['cmd'] == IPRCMD_SUBSCRIBE:
cid = self.alloc_cid()
if cid is not None:
self.subscribe[cid] = {'socket': sock,
'keys': []}
for key in cmd.get_attrs('IPR_ATTR_KEY'):
target = (key['offset'],
key['key'],
key['mask'])
self.subscribe[cid]['keys'].append(target)
rsp['cmd'] = IPRCMD_ACK

elif cmd['cmd'] == IPRCMD_UNSUBSCRIBE:
cid = cmd.get_attr('IPR_ATTR_CID')
if cid in self.subscribe:
del self.subscribe[cid]
self.dealloc_cid()
rsp['cmd'] = IPRCMD_ACK

if cmd['cmd'] == IPRCMD_REGISTER:
elif cmd['cmd'] == IPRCMD_REGISTER:
# auth request
secret = cmd.get_attr('IPR_ATTR_SECRET')
if secret == self.secret:
Expand Down Expand Up @@ -675,10 +702,27 @@ def route_data(self, sock, data):
# rsp.encode()
# sock.send(rsp.buf.getvalue())

def filter_u32(self, u32, data):
for offset, key, mask in u32['keys']:
data.seek(offset)
compare = struct.unpack('I', data.read(4))[0]
if compare & mask != key:
return
# envelope data
envelope = envmsg()
envelope['header']['type'] = NLMSG_TRANSPORT
envelope['attrs'] = [['IPR_ATTR_CDATA',
data.getvalue()]]
envelope.encode()
u32['socket'].send(envelope.buf.getvalue())

def route_local(self, sock, data, seq):
# extract masq info
target = self.masquerade.get(seq, None)
if target is not None:
if target is None:
for cid, u32 in self.subscribe.items():
self.filter_u32(u32, data)
else:
offset = 0
while offset < data.length:
data.seek(offset)
Expand Down Expand Up @@ -923,12 +967,12 @@ class Netlink(threading.Thread):
By default, netlink class connects to the local netlink socket
on startup. If you prefer to connect to another host, use::
nl = Netlink(host='tcp://remote.01host:7000')
nl = Netlink(host='tcp://remote.host:7000')
It is possible to connect to uplinks after the startup::
nl = Netlink(do_connect=False)
nl.connect('tcp://remote.01host:7000')
nl.connect('tcp://remote.host:7000')
To act as a server, call serve()::
Expand All @@ -940,7 +984,7 @@ class Netlink(threading.Thread):
groups = 0
marshal = Marshal

def __init__(self, debug=False, timeout=3, do_connect=True,
def __init__(self, debug=False, timeout=3000, do_connect=True,
host=None, key=None, cert=None, ca=None):
threading.Thread.__init__(self, name='Netlink API')
self._timeout = timeout
Expand Down Expand Up @@ -1194,7 +1238,13 @@ def monitor(self, operate=True):
'''
if operate:
self.listeners[0] = Queue.Queue(maxsize=_QUEUE_MAXSIZE)
self.cid = self.command(IPRCMD_SUBSCRIBE,
[['IPR_ATTR_KEY', {'offset': 8,
'key': 0,
'mask': 0}]])
else:
self.command(IPRCMD_UNSUBSCRIBE,
[['IPR_ATTR_CID', self.cid]])
del self.listeners[0]

def register_callback(self, callback, predicate=lambda x: True, args=None):
Expand Down
9 changes: 8 additions & 1 deletion pyroute2/netlink/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -639,7 +639,14 @@ class ctrlmsg(genlmsg):
('IPR_ATTR_SECRET', 'asciiz'),
('IPR_ATTR_HOST', 'asciiz'),
('IPR_ATTR_ADDR', 'uint32'),
('IPR_ATTR_ERROR', 'asciiz'))
('IPR_ATTR_ERROR', 'asciiz'),
('IPR_ATTR_CID', 'uint32'),
('IPR_ATTR_KEY', 'u32key'))

class u32key(nla):
fields = (('offset', 'I'),
('key', 'I'),
('mask', 'I'))


class envmsg(nlmsg):
Expand Down

0 comments on commit 3163926

Please sign in to comment.