Skip to content

Commit

Permalink
Remove device_controller fixture is singleton al all
Browse files Browse the repository at this point in the history
Update wifi provisioning tests
  • Loading branch information
ATmobica committed Nov 25, 2021
1 parent bfdc9d3 commit 1029216
Show file tree
Hide file tree
Showing 4 changed files with 164 additions and 33 deletions.
9 changes: 0 additions & 9 deletions src/test_driver/mbed/integration_tests/common/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@
from .serial_connection import SerialConnection
from .serial_device import SerialDevice

from chip import ChipDeviceCtrl

import logging
log = logging.getLogger(__name__)

Expand Down Expand Up @@ -132,10 +130,3 @@ def device(board_allocator):
device = board_allocator.allocate(name='DUT')
yield device
board_allocator.release(device)


@pytest.fixture(scope="function")
def device_controller():
devCtrl = ChipDeviceCtrl.ChipDeviceController()
yield devCtrl
devCtrl.__del__
126 changes: 116 additions & 10 deletions src/test_driver/mbed/integration_tests/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import os
import platform
import random
import shlex

from chip import exceptions

Expand All @@ -29,8 +30,90 @@
import logging
log = logging.getLogger(__name__)

class ParsingError(exceptions.ChipStackException):
def __init__(self, msg=None):
self.msg = "Parsing Error: " + msg

def scan_chip_ble_devices(devCtrl):
def __str__(self):
return self.msg

def ParseEncodedString(value):
if value.find(":") < 0:
raise ParsingError(
"Value should be encoded in encoding:encodedvalue format")
enc, encValue = value.split(":", 1)
if enc == "str":
return encValue.encode("utf-8") + b'\x00'
elif enc == "hex":
return bytes.fromhex(encValue)
raise ParsingError("Only str and hex encoding is supported")


def ParseValueWithType(value, type):
if type == 'int':
return int(value)
elif type == 'str':
return value
elif type == 'bytes':
return ParseEncodedString(value)
elif type == 'bool':
return (value.upper() not in ['F', 'FALSE', '0'])
else:
raise ParsingError('Cannot recognize type: {}'.format(type))

def FormatZCLArguments(args, command):
commandArgs = {}
for kvPair in args:
if kvPair.find("=") < 0:
raise ParsingError("Argument should in key=value format")
key, value = kvPair.split("=", 1)
valueType = command.get(key, None)
commandArgs[key] = ParseValueWithType(value, valueType)
return commandArgs

def send_zcl_command(devCtrl, line):
"""
Send ZCL message to device:
<cluster> <command> <nodeid> <endpoint> <groupid> [key=value]...
:param devCtrl: device controller instance
:param line: command line
:return: error code and command responde
"""
res = None
err = 0
try:
args = shlex.split(line)
all_commands = devCtrl.ZCLCommandList()
if len(args) < 5:
raise exceptions.InvalidArgumentCount(5, len(args))

if args[0] not in all_commands:
raise exceptions.UnknownCluster(args[0])
command = all_commands.get(args[0]).get(args[1], None)
# When command takes no arguments, (not command) is True
if command == None:
raise exceptions.UnknownCommand(args[0], args[1])
err, res = devCtrl.ZCLSend(args[0], args[1], int(
args[2]), int(args[3]), int(args[4]), FormatZCLArguments(args[5:], command), blocking=True)
if err != 0:
log.error("Failed to receive command response: {}".format(res))
elif res != None:
log.info("Received command status response:")
log.info(res)
else:
log.info("Success, no status code is attached with response.")
except exceptions.ChipStackException as ex:
log.error("An exception occurred during process ZCL command:")
log.error(str(ex))
err = -1
except Exception as ex:
log.error("An exception occurred during processing input:")
log.error(str(ex))
err = -1

return (err, res)

def scan_chip_ble_devices(devCtrl, name):
"""
BLE scan CHIP device
BLE scanning for 10 seconds and collect the results
Expand All @@ -42,32 +125,55 @@ def scan_chip_ble_devices(devCtrl):
bleMgr.scan("-t 10")

for device in bleMgr.peripheral_list:
if device.Name != name:
continue
devIdInfo = bleMgr.get_peripheral_devIdInfo(device)
if devIdInfo:
log.info("Found CHIP device {}".format(device.Name))
devices.append(devIdInfo)

return devices

def run_wifi_provisioning(devCtrl, ssid, password, discriminator, pinCode, nodeId=None):

def connect_device_over_ble(devCtrl, discriminator, pinCode, nodeId=None):
"""
Run WiFi provisionning via BLE
Connect to Matter accessory device over BLE
:param devCtrl: device controller instance
:param ssid: network ssid
:param password: network password
:param discriminator: CHIP device discriminator
:param pinCode: CHIP device pin code
:param nodeId: default value of node ID
:return: node ID is provisioning successful, otherwise None
:return: node ID is provisioning successful, otherwise None
"""
if nodeId == None:
nodeId = random.randint(1, 1000000)

try:
devCtrl.SetWifiCredential(ssid, password)
devCtrl.ConnectBLE(int(discriminator), int(pinCode), int(nodeId))
except exceptions.ChipStackException as ex:
log.error("WiFi provisioning failed: {}".format(str(ex)))
log.error("Connect device over BLE failed: {}".format(str(ex)))
return None

return nodeId
return nodeId

def commissioning_wifi(devCtrl, ssid, password, nodeId):
"""
Commissioning a Wi-Fi device
:param devCtrl: device controller instance
:param ssid: network ssid
:param password: network password
:param nodeId: value of node ID
:return: error code
"""

# Inject the credentials to the device
err, res = send_zcl_command(devCtrl, "NetworkCommissioning AddWiFiNetwork {} 0 0 ssid=str:{} credentials=str:{} breadcrumb=0 timeoutMs=1000".format(nodeId, ssid, password))
if err != 0 and res["Status"] != 0:
log.error("Set Wi-Fi credentials failed [{}]".format(err))
return err

# Enable the Wi-Fi interface
err, res = send_zcl_command(devCtrl, "NetworkCommissioning EnableNetwork {} 0 0 networkID=str:{} breadcrumb=0 timeoutMs=1000".format(nodeId, ssid))
if err != 0 and res["Status"] != 0:
log.error("Enable Wi-Fi failed [{}]".format(err))
return err

return err
31 changes: 24 additions & 7 deletions src/test_driver/mbed/integration_tests/lighting-app/test_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,20 @@
# limitations under the License.

import pytest
import re

from chip.setup_payload import SetupPayload
from chip import exceptions
from chip import ChipDeviceCtrl

from common.utils import scan_chip_ble_devices, run_wifi_provisioning
from common.utils import scan_chip_ble_devices, connect_device_over_ble, commissioning_wifi

import logging
log = logging.getLogger(__name__)

BLE_DEVICE_NAME = "MBED-lighting"
DEVICE_NODE_ID = 1234

@pytest.mark.smoketest
def test_smoke_test(device):
device.reset(duration=1)
Expand All @@ -32,18 +37,24 @@ def test_smoke_test(device):
assert ret != None and len(ret) > 0


def test_wifi_provisioning(device, network, device_controller):
ef test_wifi_provisioning(device, network):
network_ssid = network[0]
network_pass = network[1]

devCtrl = ChipDeviceCtrl.ChipDeviceController()

ret = device.wait_for_output("SetupQRCode")
assert ret != None and len(ret) > 1

qr_code = ret[-1].split('[', 1)[1].split(']')[0]
device_details = dict(SetupPayload().ParseQrCode("VP:vendorpayload%{}".format(qr_code)).attributes)
qr_code = re.sub(r"[\[\]]", "", ret[-1].partition("SetupQRCode:")[2]).strip()
try:
device_details = dict(SetupPayload().ParseQrCode("VP:vendorpayload%{}".format(qr_code)).attributes)
except exceptions.ChipStackError as ex:
log.error(ex.msg)
assert False
assert device_details != None and len(device_details) != 0

ble_chip_device = scan_chip_ble_devices(device_controller)
ble_chip_device = scan_chip_ble_devices(devCtrl, BLE_DEVICE_NAME)
assert ble_chip_device != None and len(ble_chip_device) != 0

chip_device_found = False
Expand All @@ -57,5 +68,11 @@ def test_wifi_provisioning(device, network, device_controller):

assert chip_device_found

ret = run_wifi_provisioning(device_controller, network_ssid, network_pass, int(device_details["Discriminator"]), int(device_details["SetUpPINCode"]), DEVICE_NODE_ID)
assert ret != None and ret == DEVICE_NODE_ID
ret = connect_device_over_ble(devCtrl, int(device_details["Discriminator"]), int(device_details["SetUpPINCode"]), DEVICE_NODE_ID)
assert ret != None and ret == DEVICE_NODE_ID

ret = device.wait_for_output("Device completed Rendezvous process")
assert ret != None and len(ret) > 0

ret = commissioning_wifi(devCtrl, network_ssid, network_pass, DEVICE_NODE_ID)
assert ret == 0
31 changes: 24 additions & 7 deletions src/test_driver/mbed/integration_tests/lock-app/test_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,20 @@
# limitations under the License.

import pytest
import re

from chip.setup_payload import SetupPayload
from chip import exceptions
from chip import ChipDeviceCtrl

from common.utils import scan_chip_ble_devices, run_wifi_provisioning
from common.utils import scan_chip_ble_devices, connect_device_over_ble, commissioning_wifi

import logging
log = logging.getLogger(__name__)

BLE_DEVICE_NAME = "MBED-lock"
DEVICE_NODE_ID = 1234

@pytest.mark.smoketest
def test_smoke_test(device):
device.reset(duration=1)
Expand All @@ -32,18 +37,24 @@ def test_smoke_test(device):
assert ret != None and len(ret) > 0


def test_wifi_provisioning(device, network, device_controller):
def test_wifi_provisioning(device, network):
network_ssid = network[0]
network_pass = network[1]

devCtrl = ChipDeviceCtrl.ChipDeviceController()

ret = device.wait_for_output("SetupQRCode")
assert ret != None and len(ret) > 1

qr_code = ret[-1].split('[', 1)[1].split(']')[0]
device_details = dict(SetupPayload().ParseQrCode("VP:vendorpayload%{}".format(qr_code)).attributes)
qr_code = re.sub(r"[\[\]]", "", ret[-1].partition("SetupQRCode:")[2]).strip()
try:
device_details = dict(SetupPayload().ParseQrCode("VP:vendorpayload%{}".format(qr_code)).attributes)
except exceptions.ChipStackError as ex:
log.error(ex.msg)
assert False
assert device_details != None and len(device_details) != 0

ble_chip_device = scan_chip_ble_devices(device_controller)
ble_chip_device = scan_chip_ble_devices(devCtrl, BLE_DEVICE_NAME)
assert ble_chip_device != None and len(ble_chip_device) != 0

chip_device_found = False
Expand All @@ -57,5 +68,11 @@ def test_wifi_provisioning(device, network, device_controller):

assert chip_device_found

ret = run_wifi_provisioning(device_controller, network_ssid, network_pass, int(device_details["Discriminator"]), int(device_details["SetUpPINCode"]), DEVICE_NODE_ID)
assert ret != None and ret == DEVICE_NODE_ID
ret = connect_device_over_ble(devCtrl, int(device_details["Discriminator"]), int(device_details["SetUpPINCode"]), DEVICE_NODE_ID)
assert ret != None and ret == DEVICE_NODE_ID

ret = device.wait_for_output("Device completed Rendezvous process")
assert ret != None and len(ret) > 0

ret = commissioning_wifi(devCtrl, network_ssid, network_pass, DEVICE_NODE_ID)
assert ret == 0

0 comments on commit 1029216

Please sign in to comment.