From 2b89dc01af93e49dc0d5ff6d5c804114d904cdf4 Mon Sep 17 00:00:00 2001 From: Martin Vrachev Date: Fri, 9 Apr 2021 16:20:24 +0300 Subject: [PATCH] New API: accept metadata with unrecognized fields In order to support ADR 0008 we would want to accept unrecognized fields in all metadata classes, including the classes that would be added representing a subportion of a role like "meta", "delegations" and "roles". Also, we should test that we support unrecognized fields when adding new classes or modifying existing ones to make sure we support ADR 0008. Signed-off-by: Martin Vrachev --- tests/test_api.py | 33 ++++++++++++++++++++++++++++ tuf/api/metadata.py | 52 +++++++++++++++++++++++++++++++++++++-------- 2 files changed, 76 insertions(+), 9 deletions(-) diff --git a/tests/test_api.py b/tests/test_api.py index 24f6ace24a..36581a9cf9 100755 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -330,6 +330,39 @@ def test_metadata_targets(self): # Verify that data is updated self.assertEqual(targets.signed.targets[filename], fileinfo) + def setup_dict_with_unrecognized_field(self, file_path, field, value): + json_dict = {} + with open(file_path) as f: + json_dict = json.loads(f.read()) + # We are changing the json dict without changing the signature. + # This could be a problem if we want to do verification on this dict. + json_dict["signed"][field] = value + return json_dict + + def test_support_for_unrecognized_fields(self): + for metadata in ["root", "timestamp", "snapshot", "targets"]: + path = os.path.join(self.repo_dir, "metadata", metadata + ".json") + dict1 = self.setup_dict_with_unrecognized_field(path, "f", "b") + # Test that the metadata classes store unrecognized fields when + # initializing and passes them when casting the instance to a dict. + + # TODO: Remove the deepcopy when Metadata.from_dict() doesn't have + # the side effect to destroy the passed dictionary. + temp_copy = copy.deepcopy(dict1) + metadata_obj = Metadata.from_dict(temp_copy) + + self.assertEqual(dict1["signed"], metadata_obj.signed.to_dict()) + + # Test that two instances of the same class could have different + # unrecognized fields. + dict2 = self.setup_dict_with_unrecognized_field(path, "f2", "b2") + temp_copy2 = copy.deepcopy(dict2) + metadata_obj2 = Metadata.from_dict(temp_copy2) + self.assertNotEqual( + metadata_obj.signed.to_dict(), metadata_obj2.signed.to_dict() + ) + + # Run unit test. if __name__ == '__main__': utils.configure_test_logging(sys.argv) diff --git a/tuf/api/metadata.py b/tuf/api/metadata.py index 99efd9b510..30300ac2c4 100644 --- a/tuf/api/metadata.py +++ b/tuf/api/metadata.py @@ -300,6 +300,9 @@ class Signed: spec_version: The TUF specification version number (semver) the metadata format adheres to. expires: The metadata expiration datetime object. + unrecognized_fields: An optional dictionary storing all unrecognized + fields. Used for backward compatibility. If None is provided, an + empty dictionary will be created. """ @@ -307,7 +310,12 @@ class Signed: # we keep it to match spec terminology (I often refer to this as "payload", # or "inner metadata") def __init__( - self, _type: str, version: int, spec_version: str, expires: datetime + self, + _type: str, + version: int, + spec_version: str, + expires: datetime, + unrecognized_fields: Optional[Mapping[str, Any]] = None, ) -> None: self._type = _type @@ -318,6 +326,9 @@ def __init__( if version < 0: raise ValueError(f"version must be >= 0, got {version}") self.version = version + if unrecognized_fields is None: + unrecognized_fields = {} + self.unrecognized_fields = unrecognized_fields @staticmethod def _common_fields_from_dict(signed_dict: Mapping[str, Any]) -> list: @@ -349,6 +360,7 @@ def _common_fields_to_dict(self) -> Dict[str, Any]: "version": self.version, "spec_version": self.spec_version, "expires": self.expires.isoformat() + "Z", + **self.unrecognized_fields, } # Modification. @@ -409,8 +421,11 @@ def __init__( consistent_snapshot: bool, keys: Mapping[str, Any], roles: Mapping[str, Any], + unrecognized_fields: Optional[Mapping[str, Any]] = None, ) -> None: - super().__init__(_type, version, spec_version, expires) + super().__init__( + _type, version, spec_version, expires, unrecognized_fields + ) # TODO: Add classes for keys and roles self.consistent_snapshot = consistent_snapshot self.keys = keys @@ -423,7 +438,11 @@ def from_dict(cls, root_dict: Mapping[str, Any]) -> "Root": consistent_snapshot = root_dict.pop("consistent_snapshot") keys = root_dict.pop("keys") roles = root_dict.pop("roles") - return cls(*common_args, consistent_snapshot, keys, roles) + # All fields left in the root_dict and unrecognized. + unrecognized_fields = root_dict + return cls( + *common_args, consistent_snapshot, keys, roles, unrecognized_fields + ) def to_dict(self) -> Dict[str, Any]: """Returns the dict representation of self. """ @@ -485,8 +504,11 @@ def __init__( spec_version: str, expires: datetime, meta: Mapping[str, Any], + unrecognized_fields: Optional[Mapping[str, Any]] = None, ) -> None: - super().__init__(_type, version, spec_version, expires) + super().__init__( + _type, version, spec_version, expires, unrecognized_fields + ) # TODO: Add class for meta self.meta = meta @@ -495,7 +517,9 @@ def from_dict(cls, timestamp_dict: Mapping[str, Any]) -> "Timestamp": """Creates Timestamp object from its dict representation. """ common_args = cls._common_fields_from_dict(timestamp_dict) meta = timestamp_dict.pop("meta") - return cls(*common_args, meta) + # All fields left in the timestamp_dict and unrecognized. + unrecognized_fields = timestamp_dict + return cls(*common_args, meta, unrecognized_fields) def to_dict(self) -> Dict[str, Any]: """Returns the dict representation of self. """ @@ -549,8 +573,11 @@ def __init__( spec_version: str, expires: datetime, meta: Mapping[str, Any], + unrecognized_fields: Optional[Mapping[str, Any]] = None, ) -> None: - super().__init__(_type, version, spec_version, expires) + super().__init__( + _type, version, spec_version, expires, unrecognized_fields + ) # TODO: Add class for meta self.meta = meta @@ -559,7 +586,9 @@ def from_dict(cls, snapshot_dict: Mapping[str, Any]) -> "Snapshot": """Creates Snapshot object from its dict representation. """ common_args = cls._common_fields_from_dict(snapshot_dict) meta = snapshot_dict.pop("meta") - return cls(*common_args, meta) + # All fields left in the snapshot_dict and unrecognized. + unrecognized_fields = snapshot_dict + return cls(*common_args, meta, unrecognized_fields) def to_dict(self) -> Dict[str, Any]: """Returns the dict representation of self. """ @@ -652,8 +681,11 @@ def __init__( expires: datetime, targets: Mapping[str, Any], delegations: Mapping[str, Any], + unrecognized_fields: Optional[Mapping[str, Any]] = None, ) -> None: - super().__init__(_type, version, spec_version, expires) + super().__init__( + _type, version, spec_version, expires, unrecognized_fields + ) # TODO: Add class for meta self.targets = targets self.delegations = delegations @@ -664,7 +696,9 @@ def from_dict(cls, targets_dict: Mapping[str, Any]) -> "Targets": common_args = cls._common_fields_from_dict(targets_dict) targets = targets_dict.pop("targets") delegations = targets_dict.pop("delegations") - return cls(*common_args, targets, delegations) + # All fields left in the targets_dict and unrecognized. + unrecognized_fields = targets_dict + return cls(*common_args, targets, delegations, unrecognized_fields) def to_dict(self) -> Dict[str, Any]: """Returns the dict representation of self. """