Skip to content

Commit

Permalink
Add support for > 1 DUT for TC-DA-1.7 (#25928)
Browse files Browse the repository at this point in the history
Allows the user to specify multiple DUTs by using a list of
discriminators and passcodes on the command line. Forces the
use of 2 DUTS for this test and compares the PKs.
  • Loading branch information
cecille authored and pull[bot] committed Jul 11, 2023
1 parent ee489a2 commit 3455381
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 54 deletions.
77 changes: 48 additions & 29 deletions src/python_testing/TC_DA_1_7.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,21 @@ def extract_akid(cert: Certificate) -> Optional[bytes]:
class TC_DA_1_7(MatterBaseTest):
@async_test_body
async def test_TC_DA_1_7(self):
# For real tests, we require more than one DUT
# On the CI, this doesn't make sense to do since all the examples use the same DAC
# To specify more than 1 DUT, use a list of discriminators and passcodes
allow_sdk_dac = self.user_params.get("allow_sdk_dac", False)
if allow_sdk_dac:
asserts.assert_equal(len(self.matter_test_config.discriminator), 1, "Only one device can be tested with SDK DAC")
if not allow_sdk_dac:
asserts.assert_equal(len(self.matter_test_config.discriminator), 2, "This test requires 2 DUTs")
pk = []
for i in range(len(self.matter_test_config.dut_node_id)):
pk.append(await self.single_DUT(i, self.matter_test_config.dut_node_id[i]))

asserts.assert_equal(len(pk), len(set(pk)), "Found matching public keys in different DUTs")

async def single_DUT(self, dut_index: int, dut_node_id: int) -> bytes:
# Option to allow SDK roots (skip step 4 check 2)
allow_sdk_dac = self.user_params.get("allow_sdk_dac", False)

Expand All @@ -74,52 +89,56 @@ async def test_TC_DA_1_7(self):
paa_by_skid = load_all_paa(conf.paa_trust_store_path)
logging.info("Found %d PAAs" % len(paa_by_skid))

logging.info("Step 1: Commissioning, already done")
logging.info("DUT {} Step 1: Commissioning, already done".format(dut_index))
dev_ctrl = self.default_controller

logging.info("Step 2: Get PAI of DUT1 with certificate chain request")
result = await dev_ctrl.SendCommand(self.dut_node_id, 0,
logging.info("DUT {} Step 2: Get PAI of DUT1 with certificate chain request".format(dut_index))
result = await dev_ctrl.SendCommand(dut_node_id, 0,
Clusters.OperationalCredentials.Commands.CertificateChainRequest(2))
pai_1 = result.certificate
asserts.assert_less_equal(len(pai_1), 600, "PAI cert must be at most 600 bytes")
self.record_data({"pai_1": hex_from_bytes(pai_1)})
pai = result.certificate
asserts.assert_less_equal(len(pai), 600, "PAI cert must be at most 600 bytes")
key = 'pai_{}'.format(dut_index)
self.record_data({key: hex_from_bytes(pai)})

logging.info("Step 3: Get DAC of DUT1 with certificate chain request")
result = await dev_ctrl.SendCommand(self.dut_node_id, 0,
logging.info("DUT {} Step 3: Get DAC of DUT1 with certificate chain request".format(dut_index))
result = await dev_ctrl.SendCommand(dut_node_id, 0,
Clusters.OperationalCredentials.Commands.CertificateChainRequest(1))
dac_1 = result.certificate
asserts.assert_less_equal(len(dac_1), 600, "DAC cert must be at most 600 bytes")
self.record_data({"dac_1": hex_from_bytes(dac_1)})

logging.info("Step 4 check 1: Ensure PAI's AKID matches a PAA and signature is valid")
pai1_cert = load_der_x509_certificate(pai_1)
pai1_akid = extract_akid(pai1_cert)
if pai1_akid not in paa_by_skid:
asserts.fail("DUT1's PAI (%s) not matched in PAA trust store" % hex_from_bytes(pai1_akid))

filename, paa_cert = paa_by_skid[pai1_akid]
dac = result.certificate
asserts.assert_less_equal(len(dac), 600, "DAC cert must be at most 600 bytes")
key = 'dac_{}'.format(dut_index)
self.record_data({key: hex_from_bytes(dac)})

logging.info("DUT {} Step 4 check 1: Ensure PAI's AKID matches a PAA and signature is valid".format(dut_index))
pai_cert = load_der_x509_certificate(pai)
pai_akid = extract_akid(pai_cert)
if pai_akid not in paa_by_skid:
asserts.fail("DUT %d PAI (%s) not matched in PAA trust store" % (dut_index, hex_from_bytes(pai_akid)))

filename, paa_cert = paa_by_skid[pai_akid]
logging.info("Matched PAA file %s, subject: %s" % (filename, paa_cert.subject))
public_key = paa_cert.public_key()

try:
public_key.verify(signature=pai1_cert.signature, data=pai1_cert.tbs_certificate_bytes,
public_key.verify(signature=pai_cert.signature, data=pai_cert.tbs_certificate_bytes,
signature_algorithm=ec.ECDSA(hashes.SHA256()))
except InvalidSignature as e:
asserts.fail("Failed to verify PAI signature against PAA public key: %s" % str(e))
asserts.fail("DUT %d: Failed to verify PAI signature against PAA public key: %s" % (dut_index, str(e)))
logging.info("Validated PAI signature against PAA")

logging.info("Step 4 check 2: Verify PAI AKID not in denylist of SDK PAIs")
logging.info("DUT {} Step 4 check 2: Verify PAI AKID not in denylist of SDK PAIs".format(dut_index))
if allow_sdk_dac:
logging.warn("===> TEST STEP SKIPPED: Allowing SDK DACs!")
else:
for candidate in FORBIDDEN_AKID:
asserts.assert_not_equal(hex_from_bytes(pai1_akid), hex_from_bytes(candidate), "PAI AKID must not be in denylist")

logging.info("Step 5: Extract subject public key of DAC and save")
dac1_cert = load_der_x509_certificate(dac_1)
pk_1 = dac1_cert.public_key().public_bytes(encoding=Encoding.X962, format=PublicFormat.UncompressedPoint)
logging.info("Subject public key pk_1: %s" % hex_from_bytes(pk_1))
self.record_data({"pk_1": hex_from_bytes(pk_1)})
asserts.assert_not_equal(hex_from_bytes(pai_akid), hex_from_bytes(candidate), "PAI AKID must not be in denylist")

logging.info("DUT {} Step 5: Extract subject public key of DAC and save".format(dut_index))
dac_cert = load_der_x509_certificate(dac)
pk = dac_cert.public_key().public_bytes(encoding=Encoding.X962, format=PublicFormat.UncompressedPoint)
logging.info("Subject public key pk: %s" % hex_from_bytes(pk))
key = 'pk_{}'.format(dut_index)
self.record_data({key: hex_from_bytes(pk)})
return pk


if __name__ == "__main__":
Expand Down
73 changes: 48 additions & 25 deletions src/python_testing/matter_testing_support.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,8 @@ class MatterTestConfig:
tests: List[str] = field(default_factory=list)

commissioning_method: str = None
discriminator: int = None
setup_passcode: int = None
discriminator: List[int] = None
setup_passcode: List[int] = None
commissionee_ip_address_just_for_testing: str = None
maximize_cert_chains: bool = False

Expand All @@ -145,7 +145,7 @@ class MatterTestConfig:
thread_operational_dataset: str = None

# Node ID for basic DUT
dut_node_id: int = _DEFAULT_DUT_NODE_ID
dut_node_id: List[int] = None
# Node ID to use for controller/commissioner
controller_node_id: int = _DEFAULT_CONTROLLER_NODE_ID
# CAT Tags for default controller/commissioner
Expand Down Expand Up @@ -264,7 +264,7 @@ def certificate_authority_manager(self) -> chip.CertificateAuthority.Certificate

@property
def dut_node_id(self) -> int:
return self.matter_test_config.dut_node_id
return self.matter_test_config.dut_node_id[0]

async def read_single_attribute(
self, dev_ctrl: ChipDeviceCtrl, node_id: int, endpoint: int, attribute: object, fabricFiltered: bool = True) -> object:
Expand Down Expand Up @@ -489,6 +489,28 @@ def populate_commissioning_args(args: argparse.Namespace, config: MatterTestConf
print("error: Cannot have both --qr-code and --manual-code present!")
return False

if len(config.discriminator) != len(config.setup_passcode):
print("error: supplied number of discriminators does not match number of passcodes")
return False

if len(config.dut_node_id) > len(config.discriminator):
print("error: More node IDs provided than discriminators")
return False

if len(config.dut_node_id) < len(config.discriminator):
missing = len(config.discriminator) - len(config.dut_node_id)
for i in range(missing):
config.dut_node_id.append(config.dut_node_id[-1] + 1)

if len(config.dut_node_id) != len(set(config.dut_node_id)):
print("error: Duplicate values in node id list")
return False

if len(config.discriminator) != len(set(config.discriminator)):
print("error: Duplicate value in discriminator list")
return False

# TODO: this should also allow multiple once QR and manual codes are supported.
config.qr_code_content = args.qr_code
config.manual_code = args.manual_code

Expand Down Expand Up @@ -591,9 +613,9 @@ def parse_matter_test_args(argv: List[str]) -> MatterTestConfig:
default=_DEFAULT_CONTROLLER_NODE_ID,
help='NodeID to use for initial/default controller (default: %d)' % _DEFAULT_CONTROLLER_NODE_ID)
basic_group.add_argument('-n', '--dut-node-id', type=int_decimal_or_hex,
metavar='NODE_ID', default=_DEFAULT_DUT_NODE_ID,
metavar='NODE_ID', default=[_DEFAULT_DUT_NODE_ID],
help='Node ID for primary DUT communication, '
'and NodeID to assign if commissioning (default: %d)' % _DEFAULT_DUT_NODE_ID)
'and NodeID to assign if commissioning (default: %d)' % _DEFAULT_DUT_NODE_ID, nargs="+")

commission_group = parser.add_argument_group(title="Commissioning", description="Arguments to commission a node")

Expand All @@ -603,13 +625,13 @@ def parse_matter_test_args(argv: List[str]) -> MatterTestConfig:
help='Name of commissioning method to use')
commission_group.add_argument('-d', '--discriminator', type=int_decimal_or_hex,
metavar='LONG_DISCRIMINATOR',
help='Discriminator to use for commissioning')
help='Discriminator to use for commissioning', nargs="+")
commission_group.add_argument('-p', '--passcode', type=int_decimal_or_hex,
metavar='PASSCODE',
help='PAKE passcode to use')
help='PAKE passcode to use', nargs="+")
commission_group.add_argument('-i', '--ip-addr', type=str,
metavar='RAW_IP_ADDRESS',
help='IP address to use (only for method "on-network-ip". ONLY FOR LOCAL TESTING!')
help='IP address to use (only for method "on-network-ip". ONLY FOR LOCAL TESTING!', nargs="+")

commission_group.add_argument('--wifi-ssid', type=str,
metavar='SSID',
Expand Down Expand Up @@ -692,46 +714,47 @@ class CommissionDeviceTest(MatterBaseTest):

def test_run_commissioning(self):
conf = self.matter_test_config
logging.info("Starting commissioning for root index %d, fabric ID 0x%016X, node ID 0x%016X" %
(conf.root_of_trust_index, conf.fabric_id, conf.dut_node_id))
logging.info("Commissioning method: %s" % conf.commissioning_method)
for i in range(len(conf.dut_node_id)):
logging.info("Starting commissioning for root index %d, fabric ID 0x%016X, node ID 0x%016X" %
(conf.root_of_trust_index, conf.fabric_id, conf.dut_node_id[i]))
logging.info("Commissioning method: %s" % conf.commissioning_method)

if not self._commission_device():
raise signals.TestAbortAll("Failed to commission node")
if not self._commission_device(i):
raise signals.TestAbortAll("Failed to commission node")

def _commission_device(self) -> bool:
def _commission_device(self, i) -> bool:
dev_ctrl = self.default_controller
conf = self.matter_test_config

# TODO: support by manual code and QR

if conf.commissioning_method == "on-network":
return dev_ctrl.CommissionOnNetwork(
nodeId=conf.dut_node_id,
setupPinCode=conf.setup_passcode,
nodeId=conf.dut_node_id[i],
setupPinCode=conf.setup_passcode[i],
filterType=DiscoveryFilterType.LONG_DISCRIMINATOR,
filter=conf.discriminator
filter=conf.discriminator[i]
)
elif conf.commissioning_method == "ble-wifi":
return dev_ctrl.CommissionWiFi(
conf.discriminator,
conf.setup_passcode,
conf.dut_node_id,
conf.discriminator[i],
conf.setup_passcode[i],
conf.dut_node_id[i],
conf.wifi_ssid,
conf.wifi_passphrase
)
elif conf.commissioning_method == "ble-thread":
return dev_ctrl.CommissionThread(
conf.discriminator,
conf.setup_passcode,
conf.dut_node_id,
conf.discriminator[i],
conf.setup_passcode[i],
conf.dut_node_id[i],
conf.thread_operational_dataset
)
elif conf.commissioning_method == "on-network-ip":
logging.warning("==== USING A DIRECT IP COMMISSIONING METHOD NOT SUPPORTED IN THE LONG TERM ====")
return dev_ctrl.CommissionIP(
ipaddr=conf.commissionee_ip_address_just_for_testing,
setupPinCode=conf.setup_passcode, nodeid=conf.dut_node_id
setupPinCode=conf.setup_passcode[i], nodeid=conf.dut_node_id[i]
)
else:
raise ValueError("Invalid commissioning method %s!" % conf.commissioning_method)
Expand Down

0 comments on commit 3455381

Please sign in to comment.