From 9e9616d48f87195eac1b13e670d5550aee5a82fe Mon Sep 17 00:00:00 2001 From: Maximilian Falkenstein Date: Wed, 24 Jan 2024 21:02:57 +0100 Subject: [PATCH 1/7] KMS: Stop assuming encrypt/decrypt on key import AWS KMS allows importing keys for any of the supported KMS operations, not just encryption/decryption. This was already fixed for GetParametersForImport (localstack/localstack#9111), however, the check for ImportKeyMaterial remained - lets drop it. --- localstack/services/kms/provider.py | 1 - 1 file changed, 1 deletion(-) diff --git a/localstack/services/kms/provider.py b/localstack/services/kms/provider.py index 7519d8b909ce8..e439f00e89338 100644 --- a/localstack/services/kms/provider.py +++ b/localstack/services/kms/provider.py @@ -1073,7 +1073,6 @@ def import_key_material( enabled_key_allowed=True, disabled_key_allowed=True, ) - self._validate_key_for_encryption_decryption(context, key_to_import_material_to) if import_state.wrapping_algo == AlgorithmSpec.RSAES_PKCS1_V1_5: decrypt_padding = padding.PKCS1v15() From 41e074e53509e576eaaf185867b1de967520767b Mon Sep 17 00:00:00 2001 From: Maximilian Falkenstein Date: Wed, 24 Jan 2024 21:06:47 +0100 Subject: [PATCH 2/7] KMS: Import key material to the correct place When calling ImportKeyMaterial, we cannot assume that we always import a symmetric key. Check for this case in KmsCryptoKey and make sure we set the correct member. Also note that not all asymmetric keys are RSA. --- localstack/services/kms/models.py | 26 ++++++++++++++++++++++++-- localstack/services/kms/provider.py | 3 ++- 2 files changed, 26 insertions(+), 3 deletions(-) diff --git a/localstack/services/kms/models.py b/localstack/services/kms/models.py index 23e49516a4fe3..d16eb1308d860 100644 --- a/localstack/services/kms/models.py +++ b/localstack/services/kms/models.py @@ -17,7 +17,12 @@ from cryptography.hazmat.primitives import hashes, hmac from cryptography.hazmat.primitives import serialization as crypto_serialization from cryptography.hazmat.primitives.asymmetric import ec, padding, rsa, utils +from cryptography.hazmat.primitives.asymmetric.padding import PKCS1v15, PSS from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateKey +from cryptography.hazmat.primitives.asymmetric.ec import EllipticCurvePrivateKey +from cryptography.hazmat.primitives.asymmetric.utils import Prehashed +from cryptography.hazmat.primitives.hashes import SHA256, SHA384, SHA512 +from cryptography.hazmat.primitives.serialization import load_der_private_key from localstack.aws.api.kms import ( CreateAliasRequest, @@ -156,6 +161,7 @@ class KmsCryptoKey: public_key: Optional[bytes] private_key: Optional[bytes] key_material: bytes + key_spec: str def __init__(self, key_spec: str): self.private_key = None @@ -163,8 +169,9 @@ def __init__(self, key_spec: str): # Technically, key_material, being a symmetric encryption key, is only relevant for # key_spec == SYMMETRIC_DEFAULT. # But LocalStack uses symmetric encryption with this key_material even for other specs. Asymmetric keys are - # generated, but are not actually used. + # generated, but are not actually used for encryption. Signing is different. self.key_material = os.urandom(SYMMETRIC_DEFAULT_MATERIAL_LENGTH) + self.key_spec = key_spec if key_spec == "SYMMETRIC_DEFAULT": return @@ -201,8 +208,23 @@ def __init__(self, key_spec: str): crypto_serialization.PublicFormat.SubjectPublicKeyInfo, ) + def load_key_material(self, material: bytes): + if self.key_spec == "SYMMETRIC_DEFAULT": + self.key_material = material + else: + key = load_der_private_key(material, password=None) + self.public_key = key.public_key().public_bytes( + crypto_serialization.Encoding.DER, + crypto_serialization.PublicFormat.SubjectPublicKeyInfo, + ) + self.private_key = key.private_bytes( + crypto_serialization.Encoding.DER, + crypto_serialization.PrivateFormat.PKCS8, + crypto_serialization.NoEncryption(), + ) + @property - def key(self) -> RSAPrivateKey: + def key(self) -> RSAPrivateKey | EllipticCurvePrivateKey: return crypto_serialization.load_der_private_key( self.private_key, password=None, diff --git a/localstack/services/kms/provider.py b/localstack/services/kms/provider.py index e439f00e89338..d073faf993990 100644 --- a/localstack/services/kms/provider.py +++ b/localstack/services/kms/provider.py @@ -1105,9 +1105,10 @@ def import_key_material( "A validTo date must be set if the ExpirationModel is KEY_MATERIAL_EXPIRES" ) # TODO actually set validTo and make the key expire - key_to_import_material_to.crypto_key.key_material = key_material key_to_import_material_to.metadata["Enabled"] = True key_to_import_material_to.metadata["KeyState"] = KeyState.Enabled + key_to_import_material_to.crypto_key.load_key_material(key_material) + return ImportKeyMaterialResponse() def delete_imported_key_material( From f7e3294e6c7bafe12051a442ab9bc27c81d65edd Mon Sep 17 00:00:00 2001 From: Maximilian Falkenstein Date: Wed, 24 Jan 2024 21:09:55 +0100 Subject: [PATCH 3/7] KMS: Stop using RSAES_PKCS1_V1_5 in test RSAES_PKCS1_V1_5 was dropped from the API on October 10th 2023 (https://docs.aws.amazon.com/kms/latest/developerguide/importing-keys-get-public-key-and-token.html#select-wrapping-algorithm) --- tests/aws/services/kms/test_kms.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/tests/aws/services/kms/test_kms.py b/tests/aws/services/kms/test_kms.py index 2c7b7f6a0d759..a8f8fd7e20c4d 100644 --- a/tests/aws/services/kms/test_kms.py +++ b/tests/aws/services/kms/test_kms.py @@ -8,7 +8,8 @@ import pytest from botocore.config import Config from botocore.exceptions import ClientError -from cryptography.hazmat.primitives.asymmetric.padding import PKCS1v15 +from cryptography.hazmat.primitives import serialization, hashes +from cryptography.hazmat.primitives.asymmetric import ec, padding from cryptography.hazmat.primitives.serialization import load_der_public_key from localstack.services.kms.utils import get_hash_algorithm @@ -559,7 +560,7 @@ def test_describe_and_list_sign_key(self, kms_create_key, aws_client): assert key_id in _get_all_key_ids(aws_client.kms) @markers.aws.validated - def test_import_key(self, kms_create_key, aws_client, snapshot): + def test_import_key_symmetric(self, kms_create_key, aws_client, snapshot): snapshot.add_transformer(snapshot.transform.key_value("Description")) key = kms_create_key(Origin="EXTERNAL") snapshot.match("created-key", key) @@ -573,7 +574,7 @@ def test_import_key(self, kms_create_key, aws_client, snapshot): # get key import params params = aws_client.kms.get_parameters_for_import( - KeyId=key_id, WrappingAlgorithm="RSAES_PKCS1_V1_5", WrappingKeySpec="RSA_2048" + KeyId=key_id, WrappingAlgorithm="RSAES_OAEP_SHA_256", WrappingKeySpec="RSA_2048" ) assert params["KeyId"] == key["Arn"] assert params["ImportToken"] @@ -586,7 +587,12 @@ def test_import_key(self, kms_create_key, aws_client, snapshot): # import symmetric key (key material) into KMS public_key = load_der_public_key(params["PublicKey"]) - encrypted_key = public_key.encrypt(symmetric_key, PKCS1v15()) + encrypted_key = public_key.encrypt(symmetric_key, + padding.OAEP( + mgf=padding.MGF1(algorithm=hashes.SHA256()), + algorithm=hashes.SHA256(), + label=None + )) describe_key_before_import = aws_client.kms.describe_key(KeyId=key_id) snapshot.match("describe-key-before-import", describe_key_before_import) From d7b1b70b851145eab1907b5376b1623a907e48d8 Mon Sep 17 00:00:00 2001 From: Maximilian Falkenstein Date: Wed, 24 Jan 2024 21:12:59 +0100 Subject: [PATCH 4/7] KMS: Split up _construct_sign_verify_kwargs Unfortunately, the _name_ of the third argument to key.sign() appears to be _platform dependant_. Split up the argument construction so that we can avoid referring to it by name. --- localstack/services/kms/models.py | 43 +++++++++++++++++-------------- 1 file changed, 23 insertions(+), 20 deletions(-) diff --git a/localstack/services/kms/models.py b/localstack/services/kms/models.py index d16eb1308d860..c4567a91680ea 100644 --- a/localstack/services/kms/models.py +++ b/localstack/services/kms/models.py @@ -303,9 +303,13 @@ def decrypt( def sign( self, data: bytes, message_type: MessageType, signing_algorithm: SigningAlgorithmSpec ) -> bytes: - kwargs = self._construct_sign_verify_kwargs(signing_algorithm, message_type) + hasher, wrapped_hasher = self._construct_sign_verify_hasher(signing_algorithm, message_type) try: - return self.crypto_key.key.sign(data=data, **kwargs) + if signing_algorithm.startswith("ECDSA"): + return self.crypto_key.key.sign(data, ec.ECDSA(wrapped_hasher)) + else: + padding = self._construct_sign_verify_padding(signing_algorithm, hasher) + return self.crypto_key.key.sign(data, padding, wrapped_hasher) except ValueError as exc: raise ValidationException(str(exc)) @@ -316,9 +320,13 @@ def verify( signing_algorithm: SigningAlgorithmSpec, signature: bytes, ) -> bool: - kwargs = self._construct_sign_verify_kwargs(signing_algorithm, message_type) + hasher, wrapped_hasher = self._construct_sign_verify_hasher(signing_algorithm, message_type) try: - self.crypto_key.key.public_key().verify(signature=signature, data=data, **kwargs) + if signing_algorithm.startswith("ECDSA"): + self.crypto_key.key.public_key().verify(signature, data, ec.ECDSA(wrapped_hasher)) + else: + padding = self._construct_sign_verify_padding(signing_algorithm, hasher) + self.crypto_key.key.public_key().verify(signature, data, padding, wrapped_hasher) return True except ValueError as exc: raise ValidationException(str(exc)) @@ -343,11 +351,9 @@ def _get_hmac_context(self, mac_algorithm: MacAlgorithmSpec) -> hmac.HMAC: ) return h - def _construct_sign_verify_kwargs( - self, signing_algorithm: SigningAlgorithmSpec, message_type: MessageType - ) -> Dict: - kwargs = {} - + def _construct_sign_verify_hasher( + self, signing_algorithm: SigningAlgorithmSpec, message_type: MessageType + ) -> (Prehashed | SHA256 | SHA384 | SHA512, Prehashed | SHA256 | SHA384 | SHA512): if "SHA_256" in signing_algorithm: hasher = hashes.SHA256() elif "SHA_384" in signing_algorithm: @@ -359,27 +365,24 @@ def _construct_sign_verify_kwargs( f"Unsupported hash type in SigningAlgorithm '{signing_algorithm}'" ) + wrapped_hasher = hasher if message_type == MessageType.DIGEST: - kwargs["algorithm"] = utils.Prehashed(hasher) - else: - kwargs["algorithm"] = hasher - - if signing_algorithm.startswith("ECDSA"): - kwargs["signature_algorithm"] = ec.ECDSA(algorithm=kwargs.pop("algorithm", None)) - return kwargs + wrapped_hasher = utils.Prehashed(hasher) + return hasher, wrapped_hasher + def _construct_sign_verify_padding( + self, signing_algorithm: SigningAlgorithmSpec, hasher: Prehashed | SHA256 | SHA384 | SHA512 + ) -> PKCS1v15 | PSS: if signing_algorithm.startswith("RSA"): if "PKCS" in signing_algorithm: - kwargs["padding"] = padding.PKCS1v15() + return padding.PKCS1v15() elif "PSS" in signing_algorithm: - kwargs["padding"] = padding.PSS( + return padding.PSS( mgf=padding.MGF1(hasher), salt_length=padding.PSS.MAX_LENGTH ) else: LOG.warning("Unsupported padding in SigningAlgorithm '%s'", signing_algorithm) - return kwargs - # Not a comment, rather some possibly relevant links for the future. # https://docs.aws.amazon.com/kms/latest/developerguide/asymm-create-key.html # "You cannot create an elliptic curve key pair for encryption and decryption." From 330a537c4dcf02318b2d1df091eba7b76521711d Mon Sep 17 00:00:00 2001 From: Maximilian Falkenstein Date: Wed, 24 Jan 2024 21:14:02 +0100 Subject: [PATCH 5/7] KMS: Add test for imported ecc key sign/verify Much like the case for symmetric keys, this test generates a sample key and imports it, but uses it for sign/verify instead of encrypt/decrypt. --- tests/aws/services/kms/test_kms.py | 59 ++++++++++++++++++++++++++++++ 1 file changed, 59 insertions(+) diff --git a/tests/aws/services/kms/test_kms.py b/tests/aws/services/kms/test_kms.py index a8f8fd7e20c4d..200ca8f272a27 100644 --- a/tests/aws/services/kms/test_kms.py +++ b/tests/aws/services/kms/test_kms.py @@ -627,6 +627,65 @@ def test_import_key_symmetric(self, kms_create_key, aws_client, snapshot): aws_client.kms.encrypt(Plaintext=plaintext, KeyId=key_id) snapshot.match("encrypt-after-delete-error", e.value.response) + @markers.aws.validated + def test_import_key_asymmetric(self, kms_create_key, aws_client, snapshot): + snapshot.add_transformer(snapshot.transform.key_value("Description")) + key = kms_create_key(Origin="EXTERNAL", KeySpec="ECC_NIST_P256", KeyUsage="SIGN_VERIFY") + snapshot.match("created-key", key) + key_id = key["KeyId"] + + crypto_key = ec.generate_private_key(ec.SECP256R1()) + raw_private_key = crypto_key.private_bytes( + serialization.Encoding.DER, + serialization.PrivateFormat.PKCS8, + serialization.NoEncryption(), + ) + raw_public_key = crypto_key.public_key().public_bytes( + serialization.Encoding.DER, + serialization.PublicFormat.SubjectPublicKeyInfo, + ) + plaintext = b"test content 123 !#" + + # get key import params + params = aws_client.kms.get_parameters_for_import( + KeyId=key_id, WrappingAlgorithm="RSAES_OAEP_SHA_256", WrappingKeySpec="RSA_2048" + ) + + # import asymmetric key (key material) into KMS + public_key = load_der_public_key(params["PublicKey"]) + encrypted_key = public_key.encrypt(raw_private_key, + padding.OAEP( + mgf=padding.MGF1(algorithm=hashes.SHA256()), + algorithm=hashes.SHA256(), + label=None + )) + describe_key_before_import = aws_client.kms.describe_key(KeyId=key_id) + snapshot.match("describe-key-before-import", describe_key_before_import) + + aws_client.kms.import_key_material( + KeyId=key_id, + ImportToken=params["ImportToken"], + EncryptedKeyMaterial=encrypted_key, + ExpirationModel="KEY_MATERIAL_DOES_NOT_EXPIRE", + ) + describe_key_after_import = aws_client.kms.describe_key(KeyId=key_id) + snapshot.match("describe-key-after-import", describe_key_after_import) + + # Check whether public key is derived correctly + get_public_key_after_import = aws_client.kms.get_public_key(KeyId=key_id) + assert get_public_key_after_import["PublicKey"] == raw_public_key + + # Do a sign/verify cycle + signed_data = aws_client.kms.sign(Message=plaintext, MessageType="RAW", SigningAlgorithm="ECDSA_SHA_256", + KeyId=key_id) + verify_data = aws_client.kms.verify(Message=plaintext, Signature=signed_data["Signature"], MessageType="RAW", + SigningAlgorithm="ECDSA_SHA_256", KeyId=key_id) + assert verify_data["SignatureValid"] + + aws_client.kms.delete_imported_key_material(KeyId=key_id) + describe_key_after_deleted_import = aws_client.kms.describe_key(KeyId=key_id) + snapshot.match("describe-key-after-deleted-import", describe_key_after_deleted_import) + @markers.aws.validated def test_list_aliases_of_key(self, kms_create_key, kms_create_alias, aws_client): aliased_key_id = kms_create_key()["KeyId"] From e2176484bd16c8f7392fec2b523d7c0390c722be Mon Sep 17 00:00:00 2001 From: Maximilian Falkenstein Date: Thu, 14 Mar 2024 09:29:29 +0100 Subject: [PATCH 6/7] KMS: Update snapshots --- tests/aws/services/kms/test_kms.snapshot.json | 100 +++++++++++++++++- .../aws/services/kms/test_kms.validation.json | 7 +- 2 files changed, 103 insertions(+), 4 deletions(-) diff --git a/tests/aws/services/kms/test_kms.snapshot.json b/tests/aws/services/kms/test_kms.snapshot.json index 15eb8690809c0..3dbcbb51da413 100644 --- a/tests/aws/services/kms/test_kms.snapshot.json +++ b/tests/aws/services/kms/test_kms.snapshot.json @@ -1207,8 +1207,8 @@ } } }, - "tests/aws/services/kms/test_kms.py::TestKMS::test_import_key": { - "recorded-date": "13-04-2023, 11:30:39", + "tests/aws/services/kms/test_kms.py::TestKMS::test_import_key_symmetric": { + "recorded-date": "24-01-2024, 10:44:12", "recorded-content": { "created-key": { "AWSAccountId": "111111111111", @@ -1335,6 +1335,102 @@ } } }, + "tests/aws/services/kms/test_kms.py::TestKMS::test_import_key_asymmetric": { + "recorded-date": "24-01-2024, 10:44:14", + "recorded-content": { + "created-key": { + "AWSAccountId": "111111111111", + "Arn": "arn:aws:kms::111111111111:key/", + "CreationDate": "datetime", + "CustomerMasterKeySpec": "ECC_NIST_P256", + "Description": "", + "Enabled": false, + "KeyId": "", + "KeyManager": "CUSTOMER", + "KeySpec": "ECC_NIST_P256", + "KeyState": "PendingImport", + "KeyUsage": "SIGN_VERIFY", + "MultiRegion": false, + "Origin": "EXTERNAL", + "SigningAlgorithms": [ + "ECDSA_SHA_256" + ] + }, + "describe-key-before-import": { + "KeyMetadata": { + "AWSAccountId": "111111111111", + "Arn": "arn:aws:kms::111111111111:key/", + "CreationDate": "datetime", + "CustomerMasterKeySpec": "ECC_NIST_P256", + "Description": "", + "Enabled": false, + "KeyId": "", + "KeyManager": "CUSTOMER", + "KeySpec": "ECC_NIST_P256", + "KeyState": "PendingImport", + "KeyUsage": "SIGN_VERIFY", + "MultiRegion": false, + "Origin": "EXTERNAL", + "SigningAlgorithms": [ + "ECDSA_SHA_256" + ] + }, + "ResponseMetadata": { + "HTTPHeaders": {}, + "HTTPStatusCode": 200 + } + }, + "describe-key-after-import": { + "KeyMetadata": { + "AWSAccountId": "111111111111", + "Arn": "arn:aws:kms::111111111111:key/", + "CreationDate": "datetime", + "CustomerMasterKeySpec": "ECC_NIST_P256", + "Description": "", + "Enabled": true, + "ExpirationModel": "KEY_MATERIAL_DOES_NOT_EXPIRE", + "KeyId": "", + "KeyManager": "CUSTOMER", + "KeySpec": "ECC_NIST_P256", + "KeyState": "Enabled", + "KeyUsage": "SIGN_VERIFY", + "MultiRegion": false, + "Origin": "EXTERNAL", + "SigningAlgorithms": [ + "ECDSA_SHA_256" + ] + }, + "ResponseMetadata": { + "HTTPHeaders": {}, + "HTTPStatusCode": 200 + } + }, + "describe-key-after-deleted-import": { + "KeyMetadata": { + "AWSAccountId": "111111111111", + "Arn": "arn:aws:kms::111111111111:key/", + "CreationDate": "datetime", + "CustomerMasterKeySpec": "ECC_NIST_P256", + "Description": "", + "Enabled": false, + "KeyId": "", + "KeyManager": "CUSTOMER", + "KeySpec": "ECC_NIST_P256", + "KeyState": "PendingImport", + "KeyUsage": "SIGN_VERIFY", + "MultiRegion": false, + "Origin": "EXTERNAL", + "SigningAlgorithms": [ + "ECDSA_SHA_256" + ] + }, + "ResponseMetadata": { + "HTTPHeaders": {}, + "HTTPStatusCode": 200 + } + } + } + }, "tests/aws/services/kms/test_kms.py::TestKMS::test_error_messaging_for_invalid_keys": { "recorded-date": "13-04-2023, 11:31:23", "recorded-content": { diff --git a/tests/aws/services/kms/test_kms.validation.json b/tests/aws/services/kms/test_kms.validation.json index 44a68c94a2a45..022ebab386276 100644 --- a/tests/aws/services/kms/test_kms.validation.json +++ b/tests/aws/services/kms/test_kms.validation.json @@ -83,8 +83,11 @@ "tests/aws/services/kms/test_kms.py::TestKMS::test_hmac_create_key_invalid_operations": { "last_validated_date": "2023-04-13T09:31:06+00:00" }, - "tests/aws/services/kms/test_kms.py::TestKMS::test_import_key": { - "last_validated_date": "2023-04-13T09:30:39+00:00" + "tests/aws/services/kms/test_kms.py::TestKMS::test_import_key_asymmetric": { + "last_validated_date": "2024-01-24T10:44:14+00:00" + }, + "tests/aws/services/kms/test_kms.py::TestKMS::test_import_key_symmetric": { + "last_validated_date": "2024-01-24T10:44:12+00:00" }, "tests/aws/services/kms/test_kms.py::TestKMS::test_invalid_generate_mac[HMAC_224-HMAC_SHA_256]": { "last_validated_date": "2023-04-13T09:31:14+00:00" From 229f3240b6f0e697e958cf6fffdddfae420f7327 Mon Sep 17 00:00:00 2001 From: Maximilian Falkenstein Date: Wed, 24 Jan 2024 21:25:52 +0100 Subject: [PATCH 7/7] KMS: Linter warnings --- localstack/services/kms/models.py | 54 ++++++++++++++---------------- tests/aws/services/kms/test_kms.py | 40 ++++++++++++---------- 2 files changed, 48 insertions(+), 46 deletions(-) diff --git a/localstack/services/kms/models.py b/localstack/services/kms/models.py index c4567a91680ea..2f25040a65201 100644 --- a/localstack/services/kms/models.py +++ b/localstack/services/kms/models.py @@ -17,12 +17,10 @@ from cryptography.hazmat.primitives import hashes, hmac from cryptography.hazmat.primitives import serialization as crypto_serialization from cryptography.hazmat.primitives.asymmetric import ec, padding, rsa, utils -from cryptography.hazmat.primitives.asymmetric.padding import PKCS1v15, PSS -from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateKey from cryptography.hazmat.primitives.asymmetric.ec import EllipticCurvePrivateKey +from cryptography.hazmat.primitives.asymmetric.padding import PSS, PKCS1v15 +from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateKey from cryptography.hazmat.primitives.asymmetric.utils import Prehashed -from cryptography.hazmat.primitives.hashes import SHA256, SHA384, SHA512 -from cryptography.hazmat.primitives.serialization import load_der_private_key from localstack.aws.api.kms import ( CreateAliasRequest, @@ -198,30 +196,25 @@ def __init__(self, key_spec: str): # but only used in China AWS regions. raise UnsupportedOperationException(f"KeySpec {key_spec} is not supported") - self.private_key = key.private_bytes( - crypto_serialization.Encoding.DER, - crypto_serialization.PrivateFormat.PKCS8, - crypto_serialization.NoEncryption(), - ) - self.public_key = key.public_key().public_bytes( - crypto_serialization.Encoding.DER, - crypto_serialization.PublicFormat.SubjectPublicKeyInfo, - ) + self._serialize_key(key) def load_key_material(self, material: bytes): if self.key_spec == "SYMMETRIC_DEFAULT": self.key_material = material else: - key = load_der_private_key(material, password=None) - self.public_key = key.public_key().public_bytes( - crypto_serialization.Encoding.DER, - crypto_serialization.PublicFormat.SubjectPublicKeyInfo, - ) - self.private_key = key.private_bytes( - crypto_serialization.Encoding.DER, - crypto_serialization.PrivateFormat.PKCS8, - crypto_serialization.NoEncryption(), - ) + key = crypto_serialization.load_der_private_key(material, password=None) + self._serialize_key(key) + + def _serialize_key(self, key: ec.EllipticCurvePrivateKey | rsa.RSAPrivateKey): + self.public_key = key.public_key().public_bytes( + crypto_serialization.Encoding.DER, + crypto_serialization.PublicFormat.SubjectPublicKeyInfo, + ) + self.private_key = key.private_bytes( + crypto_serialization.Encoding.DER, + crypto_serialization.PrivateFormat.PKCS8, + crypto_serialization.NoEncryption(), + ) @property def key(self) -> RSAPrivateKey | EllipticCurvePrivateKey: @@ -352,8 +345,11 @@ def _get_hmac_context(self, mac_algorithm: MacAlgorithmSpec) -> hmac.HMAC: return h def _construct_sign_verify_hasher( - self, signing_algorithm: SigningAlgorithmSpec, message_type: MessageType - ) -> (Prehashed | SHA256 | SHA384 | SHA512, Prehashed | SHA256 | SHA384 | SHA512): + self, signing_algorithm: SigningAlgorithmSpec, message_type: MessageType + ) -> ( + Prehashed | hashes.SHA256 | hashes.SHA384 | hashes.SHA512, + Prehashed | hashes.SHA256 | hashes.SHA384 | hashes.SHA512, + ): if "SHA_256" in signing_algorithm: hasher = hashes.SHA256() elif "SHA_384" in signing_algorithm: @@ -371,15 +367,15 @@ def _construct_sign_verify_hasher( return hasher, wrapped_hasher def _construct_sign_verify_padding( - self, signing_algorithm: SigningAlgorithmSpec, hasher: Prehashed | SHA256 | SHA384 | SHA512 + self, + signing_algorithm: SigningAlgorithmSpec, + hasher: Prehashed | hashes.SHA256 | hashes.SHA384 | hashes.SHA512, ) -> PKCS1v15 | PSS: if signing_algorithm.startswith("RSA"): if "PKCS" in signing_algorithm: return padding.PKCS1v15() elif "PSS" in signing_algorithm: - return padding.PSS( - mgf=padding.MGF1(hasher), salt_length=padding.PSS.MAX_LENGTH - ) + return padding.PSS(mgf=padding.MGF1(hasher), salt_length=padding.PSS.MAX_LENGTH) else: LOG.warning("Unsupported padding in SigningAlgorithm '%s'", signing_algorithm) diff --git a/tests/aws/services/kms/test_kms.py b/tests/aws/services/kms/test_kms.py index 200ca8f272a27..fb4780b6a7699 100644 --- a/tests/aws/services/kms/test_kms.py +++ b/tests/aws/services/kms/test_kms.py @@ -8,7 +8,7 @@ import pytest from botocore.config import Config from botocore.exceptions import ClientError -from cryptography.hazmat.primitives import serialization, hashes +from cryptography.hazmat.primitives import hashes, serialization from cryptography.hazmat.primitives.asymmetric import ec, padding from cryptography.hazmat.primitives.serialization import load_der_public_key @@ -587,12 +587,12 @@ def test_import_key_symmetric(self, kms_create_key, aws_client, snapshot): # import symmetric key (key material) into KMS public_key = load_der_public_key(params["PublicKey"]) - encrypted_key = public_key.encrypt(symmetric_key, - padding.OAEP( - mgf=padding.MGF1(algorithm=hashes.SHA256()), - algorithm=hashes.SHA256(), - label=None - )) + encrypted_key = public_key.encrypt( + symmetric_key, + padding.OAEP( + mgf=padding.MGF1(algorithm=hashes.SHA256()), algorithm=hashes.SHA256(), label=None + ), + ) describe_key_before_import = aws_client.kms.describe_key(KeyId=key_id) snapshot.match("describe-key-before-import", describe_key_before_import) @@ -653,12 +653,12 @@ def test_import_key_asymmetric(self, kms_create_key, aws_client, snapshot): # import asymmetric key (key material) into KMS public_key = load_der_public_key(params["PublicKey"]) - encrypted_key = public_key.encrypt(raw_private_key, - padding.OAEP( - mgf=padding.MGF1(algorithm=hashes.SHA256()), - algorithm=hashes.SHA256(), - label=None - )) + encrypted_key = public_key.encrypt( + raw_private_key, + padding.OAEP( + mgf=padding.MGF1(algorithm=hashes.SHA256()), algorithm=hashes.SHA256(), label=None + ), + ) describe_key_before_import = aws_client.kms.describe_key(KeyId=key_id) snapshot.match("describe-key-before-import", describe_key_before_import) @@ -676,10 +676,16 @@ def test_import_key_asymmetric(self, kms_create_key, aws_client, snapshot): assert get_public_key_after_import["PublicKey"] == raw_public_key # Do a sign/verify cycle - signed_data = aws_client.kms.sign(Message=plaintext, MessageType="RAW", SigningAlgorithm="ECDSA_SHA_256", - KeyId=key_id) - verify_data = aws_client.kms.verify(Message=plaintext, Signature=signed_data["Signature"], MessageType="RAW", - SigningAlgorithm="ECDSA_SHA_256", KeyId=key_id) + signed_data = aws_client.kms.sign( + Message=plaintext, MessageType="RAW", SigningAlgorithm="ECDSA_SHA_256", KeyId=key_id + ) + verify_data = aws_client.kms.verify( + Message=plaintext, + Signature=signed_data["Signature"], + MessageType="RAW", + SigningAlgorithm="ECDSA_SHA_256", + KeyId=key_id, + ) assert verify_data["SignatureValid"] aws_client.kms.delete_imported_key_material(KeyId=key_id)