Skip to content
This repository has been archived by the owner on Jan 27, 2023. It is now read-only.

Commit

Permalink
Return strings instead of byte sequences/ints
Browse files Browse the repository at this point in the history
- IP addresses
- MAC addresses
- (B)SSIDs

Are now all returned in human-readable form instead of as
bytestrings/ints
  • Loading branch information
seveas committed Mar 7, 2013
1 parent 3197d3f commit 76103cc
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 38 deletions.
58 changes: 54 additions & 4 deletions NetworkManager.py
@@ -1,8 +1,12 @@
import dbus
import socket
import struct
import sys

if sys.version_info >= (3,0):
basestring = str
if not hasattr(__builtins__, 'bytes'):
bytes = chr

class NMDbusInterface(object):
bus = dbus.SystemBus()
Expand Down Expand Up @@ -30,9 +34,11 @@ def __init__(self, object_path=None):

def _make_property(self, name):
def get(self):
return self.unwrap(self.proxy.Get(self.interface_name, name,
data = self.unwrap(self.proxy.Get(self.interface_name, name,
dbus_interface='org.freedesktop.DBus.Properties'))
return self.postprocess(name, data)
def set(self, value):
data = self.preprocess(name, data)
return self.proxy.Set(self.interface_name, name, self.wrap(value),
dbus_interface='org.freedesktop.DBus.Properties')
return property(get, set)
Expand All @@ -58,8 +64,10 @@ def unwrap(self, val):
return bool(val)
if isinstance(val, (dbus.Int16, dbus.UInt16, dbus.Int32, dbus.UInt32, dbus.Int64, dbus.UInt64)):
return int(val)
if isinstance(val, dbus.Byte):
return bytes(int(val))
return val

def wrap(self, val):
if isinstance(val, NMDbusInterface):
return val.object_path
Expand All @@ -78,8 +86,9 @@ def proxy_call(*args, **kwargs):
func = getattr(self.interface, name)
args = self.wrap(args)
kwargs = self.wrap(kwargs)
ret = func(*args, **kwargs)
return self.unwrap(ret)
args, kwargs = self.preprocess(name, args, kwargs)
ret = self.unwrap(func(*args, **kwargs))
return self.postprocess(name, ret)
return proxy_call

def connect_to_signal(self, signal, handler, *args, **kwargs):
Expand All @@ -90,6 +99,12 @@ def helper(*args, **kwargs):
kwargs = self.wrap(kwargs)
return self.proxy.connect_to_signal(signal, helper, *args, **kwargs)

def postprocess(self, name, val):
return val

def preprocess(self, name, args, kwargs):
return args, kwargs

class NetworkManager(NMDbusInterface):
interface_name = 'org.freedesktop.NetworkManager'
object_path = '/org/freedesktop/NetworkManager'
Expand All @@ -103,6 +118,28 @@ class Settings(NMDbusInterface):
class Connection(NMDbusInterface):
interface_name = 'org.freedesktop.NetworkManager.Settings.Connection'

def postprocess(self, name, val):
# SSID is sent as bytes, make it a string
if name == 'GetSettings':
if 'ssid' in val.get('802-11-wireless', {}):
val['802-11-wireless']['ssid'] = "".join(val['802-11-wireless']['ssid'])
for key in val:
val_ = val[key]
if 'mac-address' in val_:
val_['mac-address'] = "%02X:%02X:%02X:%02X:%02X:%02X" % tuple([ord(x) for x in val_['mac-address']])
if 'bssid' in val_:
val_['bssid'] = "%02X:%02X:%02X:%02X:%02X:%02X" % tuple([ord(x) for x in val_['bssid']])
if 'ipv4' in val:
val['ipv4']['addresses'] = [(socket.inet_ntoa(struct.pack('L', addr[0])),
addr[1], socket.inet_ntoa(struct.pack('L', addr[2])))
for addr in val['ipv4']['addresses']]
val['ipv4']['routes'] = [(socket.inet_ntoa(struct.pack('L', route[0])),
route[1], socket.inet_ntoa(struct.pack('L', route[2])),
socket.ntohl(route[3])) for route in val['ipv4']['routes']]
val['ipv4']['dns'] = [socket.inet_ntoa(struct.pack('L', addr))
for addr in val['ipv4']['dns']]
return val

class ActiveConnection(NMDbusInterface):
interface_name = 'org.freedesktop.NetworkManager.Connection.Active'

Expand Down Expand Up @@ -143,6 +180,19 @@ class OlpcMesh(NMDbusInterface):
class IP4Config(NMDbusInterface):
interface_name = 'org.freedesktop.NetworkManager.IP4Config'

def postprocess(self, name, val):
if name == 'Addresses':
return [(socket.inet_ntoa(struct.pack('L', addr[0])),
addr[1], socket.inet_ntoa(struct.pack('L', addr[2])))
for addr in val]
if name == 'Routes':
return [(socket.inet_ntoa(struct.pack('L', route[0])),
route[1], socket.inet_ntoa(struct.pack('L', route[2])),
socket.ntohl(route[3])) for route in val]
if name in ('Nameservers', 'WinsServers'):
return [socket.inet_ntoa(struct.pack('L', addr)) for addr in val]
return val

class IP6Config(NMDbusInterface):
interface_name = 'org.freedesktop.NetworkManager.IP6Config'

Expand Down
14 changes: 4 additions & 10 deletions examples/connection_detail.py
Expand Up @@ -2,8 +2,6 @@
Display detailed information about currently active connections.
"""
import NetworkManager
import socket
import struct

c = NetworkManager.const

Expand All @@ -22,8 +20,6 @@
for key, val in sorted(settings.items()):
print(" %s" % key)
for name, value in val.items():
if name == 'ssid':
value = "".join([str(x) for x in value])
print(format % (name, value))
for dev in conn.Devices:
print("Device: %s" % dev.Interface)
Expand All @@ -35,12 +31,10 @@
print(" IPv4 config")
print(" Addresses")
for addr in dev.Ip4Config.Addresses:
print(" %s/%d -> %s" % (socket.inet_ntoa(struct.pack('L', addr[0])), addr[1],
socket.inet_ntoa(struct.pack('L', addr[2]))))
print(" %s/%d -> %s" % addr)
print(" Routes")
for addr in dev.Ip4Config.Routes:
print(" %s/%d -> %s (%d)" % (socket.inet_ntoa(struct.pack('L', route[0])), route[1],
socket.inet_ntoa(struct.pack('L', route[2])), socket.ntohl(route[3])))
for route in dev.Ip4Config.Routes:
print(" %s/%d -> %s (%d)" % route)
print(" Nameservers")
for ns in dev.Ip4Config.Nameservers:
print(" %s" % socket.inet_ntoa(struct.pack('L', ns)))
print(" %s" % ns)
30 changes: 6 additions & 24 deletions n-m
Expand Up @@ -167,18 +167,18 @@ def info(names):
print("Connect automatically:", ["No","Yes"][conn['connection'].get('autoconnect', True)])
print("Last connected on:", str(datetime.datetime.fromtimestamp(conn['connection']['timestamp'])))
print("IPv4 settings (%s)" % conn['ipv4']['method'])
print(" Address(es):", auto(ipv4s(conn['ipv4']['addresses'])))
print(" DNS servers:", auto(ipv4s(conn['ipv4']['dns'])))
print(" Routes:", auto(routes(conn['ipv4']['routes'])))
print(" Address(es):", ', '.join([x[0] for x in conn['ipv4']['addresses']]) or '(Automatic)')
print(" DNS servers:", ', '.join(conn['ipv4']['dns']) or '(Automatic)')
print(" Routes:", ", ".join(["%s/%d -> %s" % x[:3] for x in conn['ipv4']['routes']]))
print(" Can be default route:", ["Yes","No"][conn['ipv4'].get('never-default', False)])

if conn['connection']['type'] == '802-3-ethernet':
print("Physical link")
print(" MAC address:", mac(conn['802-3-ethernet'].get('mac-address')))
print(" MAC address:", conn['802-3-ethernet'].get('mac-address', '(Automatic)'))
elif conn['connection']['type'] == '802-11-wireless':
print("Wireless link")
print(" MAC address:", mac(conn['802-11-wireless'].get('mac-address')))
print(" SSID:", ssid(conn['802-11-wireless']['ssid']))
print(" MAC address:", conn['802-11-wireless'].get('mac-address', '(Automatic)'))
print(" SSID:", conn['802-11-wireless']['ssid'])
print(" Wireless security:", conn[conn['802-11-wireless']['security']]['key-mgmt'])
elif conn['connection']['type'] == 'vpn':
print("VPN")
Expand All @@ -196,23 +196,5 @@ def dump(names):
from pprint import pprint
pprint(connections[n])

def auto(seq):
return "(Automatic)" if not seq else ", ".join(seq)

def ipv4(ip):
return socket.inet_ntoa(struct.pack('L',ip))

def ipv4s(seq):
return [ipv4(x) for x in seq]

def routes(seq):
return ["%s/%d -> %s" % (ipv4(x[0]), x[1], ipv4(x[2])) for x in seq]

def mac(seq):
return "(Automatic)" if not seq else "%X:%X:%X:%X:%X:%X" % tuple(seq)

def ssid(seq):
return "".join([chr(x) for x in seq])

if __name__ == '__main__':
main()

0 comments on commit 76103cc

Please sign in to comment.