Skip to content

Commit

Permalink
test: Add a script to test P2P through wpa_supplicant DBus if
Browse files Browse the repository at this point in the history
This script loosely mimics wpa_cli's P2P commands. It's just meant to
test wpa_supplicant's DBus API around P2P.
  • Loading branch information
Tomasz Bursztyka authored and pfl committed Feb 19, 2014
1 parent a1d11b7 commit 4990bf7
Show file tree
Hide file tree
Showing 2 changed files with 312 additions and 1 deletion.
2 changes: 1 addition & 1 deletion Makefile.am
Expand Up @@ -347,7 +347,7 @@ test_scripts = test/get-state test/list-services \
test/test-counter test/set-ipv4-method test/set-ipv6-method \
test/get-services test/get-proxy-autoconfig test/set-proxy \
test/enable-tethering test/disable-tethering test/backtrace \
test/test-session \
test/test-session test/p2p-on-supplicant \
test/test-new-supplicant test/service-move-before \
test/set-global-timeservers test/get-global-timeservers \
test/set-nameservers test/set-domains test/set-timeservers \
Expand Down
311 changes: 311 additions & 0 deletions test/p2p-on-supplicant
@@ -0,0 +1,311 @@
#!/usr/bin/python

from os import O_NONBLOCK
from sys import stdin, stdout, exit, version_info
from fcntl import fcntl, F_GETFL, F_SETFL
import glib
import dbus
import dbus.mainloop.glib
import gobject

WPA_NAME='fi.w1.wpa_supplicant1'
WPA_INTF='fi.w1.wpa_supplicant1'
WPA_PATH='/fi/w1/wpa_supplicant1'
WPA_IF_INTF = WPA_INTF + '.Interface'
WPA_P2P_INTF = WPA_IF_INTF + '.P2PDevice'
WPA_PEER_INTF = WPA_INTF + '.Peer'
DBUS_PROPERTIES_INTF = 'org.freedesktop.DBus.Properties'

class InputLine:
def __init__(self, handler):
self.line = ''
self.handler = handler

flags = fcntl(stdin.fileno(), F_GETFL)
flags |= O_NONBLOCK
fcntl(stdin.fileno(), F_SETFL, flags)
glib.io_add_watch(stdin, glib.IO_IN, self.input_cb)

self.prompt()

def prompt(self):
self.line = ''
print '> ',
stdout.flush()

def input_cb(self, fd, event):
if event != glib.IO_IN:
return

self.line += fd.read();
for line in self.line.split('\n'):
line = line.strip()
if len(line) == 0:
break

self.handler(line.strip())

self.prompt()

return True

def error_print(ex):
print 'Command Error: %s' % ex

def checkarg(nb_args):
def under(function):
def wrapper(*args, **kwargs):
if len(args[1]) != nb_args:
raise Exception('Command %s takes %s arguments' % (function.__name__, nb_args))
return function(*args, **kwargs)
return wrapper
return under

class Wpa_s:
def __init__(self, iface_name = None):
self.wpa = dbus.Interface(bus.get_object(WPA_NAME, WPA_PATH), WPA_INTF)
bus.add_signal_receiver(self.__wpa_property_changed, path=WPA_PATH,
member_keyword='signal')
self.__reset()

self.line_in = InputLine(self.__command)

if self.iface_name != None:
self.create_if(self.iface_name)

def help(self, args):
print 'Commands:'
print 'quit'
print 'create_if <iface_name>'
print 'get_if <iface_name>'
print 'del_if'
print 'scan'
print 'p2p_find'
print 'p2p_stop_find'
print 'p2p_flush'
print 'p2p_group_add'
print 'p2p_group_remove'
print 'p2p_peers'
print 'p2p_peer <p2p device name>'
print 'p2p_connect <p2p device name>'

def __command(self, cmd_line):
cmd = cmd_line.split(' ')

try:
func = getattr(self, cmd[0])
except Exception, e:
print 'Error: command unknown - %s' % e
return

try:
func(cmd[1:])
except Exception, e:
error_print(e)

def __wpa_property_changed(*args, **kwargs):
print 'WPA - Signal: %s' % kwargs.get('signal')

def __if_property_changed(*args, **kwargs):
print 'IF - Signal: %s' % kwargs.get('signal')

def __p2p_property_changed(*args, **kwargs):
signal = kwargs.get('signal')
print 'IF P2P - Signal: %s' % signal

"""
It should be: __DeviceFound(self, object_path, properties)
wpa_supplicant's DBus API is buggy here:
- no properties are given
"""
def __DeviceFound(self, object_path):
self.peers[object_path] = None

peer = bus.get_object(WPA_INTF, object_path)
peer_if = dbus.Interface(peer, DBUS_PROPERTIES_INTF)

self.peers[object_path] = peer_if.GetAll(WPA_PEER_INTF)

def __DeviceLost(self, object_path):
if object_path in self.peers:
del self.peers[object_path]

"""
"Of course"... properties are not the group's properties,
but a bunch of informations related to the group:
- object path of the interface object
- it's role
- object path of the group object
"""
def __GroupStarted(self, properties):
self.group = properties
self.group_if = dbus.Interface(bus.get_object(WPA_INTF,
self.group['interface_object']),
WPA_P2P_INTF)

def __listen_if_signals(self):
bus.add_signal_receiver(self.__if_property_changed,
dbus_interface=WPA_IF_INTF,
path=self.iface_path,
member_keyword='signal')
bus.add_signal_receiver(self.__p2p_property_changed,
dbus_interface=WPA_P2P_INTF,
path=self.iface_path,
member_keyword='signal')
bus.add_signal_receiver(self.__GroupStarted,
dbus_interface=WPA_P2P_INTF,
path=self.iface_path,
signal_name='GroupStarted')
bus.add_signal_receiver(self.__DeviceFound,
dbus_interface=WPA_P2P_INTF,
path=self.iface_path,
signal_name='DeviceFound')
bus.add_signal_receiver(self.__DeviceLost,
dbus_interface=WPA_P2P_INTF,
path=self.iface_path,
signal_name='DeviceLost')

def __reset(self):
self.iface_path = self.iface_name = self.iface = None
self.p2p = self.group = self.group_if = None
self.peers = {}

def __set_if(self, iface_name):
self.iface = dbus.Interface(bus.get_object(WPA_INTF,
self.iface_path), WPA_IF_INTF)
self.p2p = dbus.Interface(bus.get_object(WPA_INTF,
self.iface_path), WPA_P2P_INTF)

print 'Interface %s: %s' % (iface_name, self.iface_path)
self.iface_name = iface_name
self.__listen_if_signals()

@checkarg(nb_args=1)
def create_if(self, args):
self.__reset()
self.iface_path = self.wpa.CreateInterface(({"Ifname" : args[0]}))
self.__set_if(args[0])

@checkarg(nb_args=1)
def get_if(self, args):
self.__reset()
self.iface_path = self.wpa.GetInterface(args[0])
self.__set_if(args[0])

@checkarg(nb_args=0)
def del_if(self, args = None):
if not self.iface_path:
return

self.wpa.RemoveInterface(self.iface_path)
print 'Interface %s removed' % self.iface_name
self.__reset()

@checkarg(nb_args=0)
def scan(self, args = None):
if not self.iface:
return

self.iface.Scan(({ 'Type': 'passive'}))
print 'Scan started'

@checkarg(nb_args=0)
def quit(self, args = None):
self.del_if(args)
exit(0)

@checkarg(nb_args=0)
def p2p_find(self, args = None):
if not self.iface:
return

self.p2p.Find(({}))

@checkarg(nb_args=0)
def p2p_stop_find(self, args = None):
if not self.p2p:
return

self.p2p.StopFind()

@checkarg(nb_args=0)
def p2p_peers(self, args = None):
if not self.iface:
return

for p in self.peers:
print 'Peer Name=%s' % (self.peers[p]['DeviceName'])

def __find_peer(self, peer_name, ret_object_path = False):
if len(self.peers) == 0:
return None

peer = None
for p in self.peers:
if self.peers[p]['DeviceName'] == peer_name:
peer = self.peers[p]
break

if not peer:
print 'No peer found under the name: %s' % peer_name
p = None

if ret_object_path:
return p
else:
return peer

@checkarg(nb_args=1)
def p2p_peer(self, args):
peer = self.__find_peer(args[0])
if peer:
for k in peer:
print '%s = %s' % (k, peer[k])

@checkarg(nb_args=1)
def p2p_connect(self, args):
if not self.p2p:
return

peer = self.__find_peer(args[0], True)
if peer:
pin = self.p2p.Connect(({ 'peer' : peer,
'wps_method' : 'pbc'}))
if not pin:
print 'WPS PIN in use: %s' % pin

@checkarg(nb_args=0)
def p2p_group_add(self, args):
if not self.p2p:
return

self.p2p.GroupAdd(({ 'persistent' : dbus.Boolean(1) }))

@checkarg(nb_args=0)
def p2p_group_remove(self, args):
if not self.group_if:
return

self.group_if.Disconnect()

@checkarg(nb_args=0)
def p2p_flush(self, args):
if not self.p2p:
return

self.p2p.Flush()

if __name__ == '__main__':
if version_info.major != 2:
print 'You need to run this under Python 2.x'
exit(1)

dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)

bus = dbus.SystemBus()

mainloop = gobject.MainLoop()

wpa_s = Wpa_s()

mainloop.run()

0 comments on commit 4990bf7

Please sign in to comment.