Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KMS: Fixup ImportKeyMaterial for non-symmetric keys #10116

Merged
merged 7 commits into from Mar 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
75 changes: 48 additions & 27 deletions localstack/services/kms/models.py
Expand Up @@ -17,7 +17,10 @@
from cryptography.hazmat.primitives import hashes, hmac
from cryptography.hazmat.primitives import serialization as crypto_serialization
from cryptography.hazmat.primitives.asymmetric import ec, padding, rsa, utils
from cryptography.hazmat.primitives.asymmetric.ec import EllipticCurvePrivateKey
from cryptography.hazmat.primitives.asymmetric.padding import PSS, PKCS1v15
from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateKey
from cryptography.hazmat.primitives.asymmetric.utils import Prehashed

from localstack.aws.api.kms import (
CreateAliasRequest,
Expand Down Expand Up @@ -156,15 +159,17 @@ class KmsCryptoKey:
public_key: Optional[bytes]
private_key: Optional[bytes]
key_material: bytes
key_spec: str

def __init__(self, key_spec: str):
self.private_key = None
self.public_key = None
# Technically, key_material, being a symmetric encryption key, is only relevant for
# key_spec == SYMMETRIC_DEFAULT.
# But LocalStack uses symmetric encryption with this key_material even for other specs. Asymmetric keys are
# generated, but are not actually used.
# generated, but are not actually used for encryption. Signing is different.
self.key_material = os.urandom(SYMMETRIC_DEFAULT_MATERIAL_LENGTH)
self.key_spec = key_spec

if key_spec == "SYMMETRIC_DEFAULT":
return
Expand All @@ -191,18 +196,28 @@ def __init__(self, key_spec: str):
# but only used in China AWS regions.
raise UnsupportedOperationException(f"KeySpec {key_spec} is not supported")

self._serialize_key(key)

def load_key_material(self, material: bytes):
if self.key_spec == "SYMMETRIC_DEFAULT":
self.key_material = material
else:
key = crypto_serialization.load_der_private_key(material, password=None)
self._serialize_key(key)

def _serialize_key(self, key: ec.EllipticCurvePrivateKey | rsa.RSAPrivateKey):
self.public_key = key.public_key().public_bytes(
crypto_serialization.Encoding.DER,
crypto_serialization.PublicFormat.SubjectPublicKeyInfo,
)
self.private_key = key.private_bytes(
crypto_serialization.Encoding.DER,
crypto_serialization.PrivateFormat.PKCS8,
crypto_serialization.NoEncryption(),
)
self.public_key = key.public_key().public_bytes(
crypto_serialization.Encoding.DER,
crypto_serialization.PublicFormat.SubjectPublicKeyInfo,
)

@property
def key(self) -> RSAPrivateKey:
def key(self) -> RSAPrivateKey | EllipticCurvePrivateKey:
return crypto_serialization.load_der_private_key(
self.private_key,
password=None,
Expand Down Expand Up @@ -281,9 +296,13 @@ def decrypt(
def sign(
self, data: bytes, message_type: MessageType, signing_algorithm: SigningAlgorithmSpec
) -> bytes:
kwargs = self._construct_sign_verify_kwargs(signing_algorithm, message_type)
hasher, wrapped_hasher = self._construct_sign_verify_hasher(signing_algorithm, message_type)
try:
return self.crypto_key.key.sign(data=data, **kwargs)
if signing_algorithm.startswith("ECDSA"):
return self.crypto_key.key.sign(data, ec.ECDSA(wrapped_hasher))
else:
padding = self._construct_sign_verify_padding(signing_algorithm, hasher)
return self.crypto_key.key.sign(data, padding, wrapped_hasher)
except ValueError as exc:
raise ValidationException(str(exc))

Expand All @@ -294,9 +313,13 @@ def verify(
signing_algorithm: SigningAlgorithmSpec,
signature: bytes,
) -> bool:
kwargs = self._construct_sign_verify_kwargs(signing_algorithm, message_type)
hasher, wrapped_hasher = self._construct_sign_verify_hasher(signing_algorithm, message_type)
try:
self.crypto_key.key.public_key().verify(signature=signature, data=data, **kwargs)
if signing_algorithm.startswith("ECDSA"):
self.crypto_key.key.public_key().verify(signature, data, ec.ECDSA(wrapped_hasher))
else:
padding = self._construct_sign_verify_padding(signing_algorithm, hasher)
self.crypto_key.key.public_key().verify(signature, data, padding, wrapped_hasher)
return True
except ValueError as exc:
raise ValidationException(str(exc))
Expand All @@ -321,11 +344,12 @@ def _get_hmac_context(self, mac_algorithm: MacAlgorithmSpec) -> hmac.HMAC:
)
return h

def _construct_sign_verify_kwargs(
def _construct_sign_verify_hasher(
self, signing_algorithm: SigningAlgorithmSpec, message_type: MessageType
) -> Dict:
kwargs = {}

) -> (
Prehashed | hashes.SHA256 | hashes.SHA384 | hashes.SHA512,
Prehashed | hashes.SHA256 | hashes.SHA384 | hashes.SHA512,
):
if "SHA_256" in signing_algorithm:
hasher = hashes.SHA256()
elif "SHA_384" in signing_algorithm:
Expand All @@ -337,27 +361,24 @@ def _construct_sign_verify_kwargs(
f"Unsupported hash type in SigningAlgorithm '{signing_algorithm}'"
)

wrapped_hasher = hasher
if message_type == MessageType.DIGEST:
kwargs["algorithm"] = utils.Prehashed(hasher)
else:
kwargs["algorithm"] = hasher

if signing_algorithm.startswith("ECDSA"):
kwargs["signature_algorithm"] = ec.ECDSA(algorithm=kwargs.pop("algorithm", None))
return kwargs
wrapped_hasher = utils.Prehashed(hasher)
return hasher, wrapped_hasher

def _construct_sign_verify_padding(
self,
signing_algorithm: SigningAlgorithmSpec,
hasher: Prehashed | hashes.SHA256 | hashes.SHA384 | hashes.SHA512,
) -> PKCS1v15 | PSS:
if signing_algorithm.startswith("RSA"):
if "PKCS" in signing_algorithm:
kwargs["padding"] = padding.PKCS1v15()
return padding.PKCS1v15()
elif "PSS" in signing_algorithm:
kwargs["padding"] = padding.PSS(
mgf=padding.MGF1(hasher), salt_length=padding.PSS.MAX_LENGTH
)
return padding.PSS(mgf=padding.MGF1(hasher), salt_length=padding.PSS.MAX_LENGTH)
else:
LOG.warning("Unsupported padding in SigningAlgorithm '%s'", signing_algorithm)

return kwargs

# Not a comment, rather some possibly relevant links for the future.
# https://docs.aws.amazon.com/kms/latest/developerguide/asymm-create-key.html
# "You cannot create an elliptic curve key pair for encryption and decryption."
Expand Down
4 changes: 2 additions & 2 deletions localstack/services/kms/provider.py
Expand Up @@ -1073,7 +1073,6 @@ def import_key_material(
enabled_key_allowed=True,
disabled_key_allowed=True,
)
self._validate_key_for_encryption_decryption(context, key_to_import_material_to)

if import_state.wrapping_algo == AlgorithmSpec.RSAES_PKCS1_V1_5:
decrypt_padding = padding.PKCS1v15()
Expand Down Expand Up @@ -1106,9 +1105,10 @@ def import_key_material(
"A validTo date must be set if the ExpirationModel is KEY_MATERIAL_EXPIRES"
)
# TODO actually set validTo and make the key expire
key_to_import_material_to.crypto_key.key_material = key_material
key_to_import_material_to.metadata["Enabled"] = True
key_to_import_material_to.metadata["KeyState"] = KeyState.Enabled
key_to_import_material_to.crypto_key.load_key_material(key_material)

return ImportKeyMaterialResponse()

def delete_imported_key_material(
Expand Down
79 changes: 75 additions & 4 deletions tests/aws/services/kms/test_kms.py
Expand Up @@ -8,7 +8,8 @@
import pytest
from botocore.config import Config
from botocore.exceptions import ClientError
from cryptography.hazmat.primitives.asymmetric.padding import PKCS1v15
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import ec, padding
from cryptography.hazmat.primitives.serialization import load_der_public_key

from localstack.services.kms.utils import get_hash_algorithm
Expand Down Expand Up @@ -559,7 +560,7 @@ def test_describe_and_list_sign_key(self, kms_create_key, aws_client):
assert key_id in _get_all_key_ids(aws_client.kms)

@markers.aws.validated
def test_import_key(self, kms_create_key, aws_client, snapshot):
def test_import_key_symmetric(self, kms_create_key, aws_client, snapshot):
snapshot.add_transformer(snapshot.transform.key_value("Description"))
key = kms_create_key(Origin="EXTERNAL")
snapshot.match("created-key", key)
Expand All @@ -573,7 +574,7 @@ def test_import_key(self, kms_create_key, aws_client, snapshot):

# get key import params
params = aws_client.kms.get_parameters_for_import(
KeyId=key_id, WrappingAlgorithm="RSAES_PKCS1_V1_5", WrappingKeySpec="RSA_2048"
KeyId=key_id, WrappingAlgorithm="RSAES_OAEP_SHA_256", WrappingKeySpec="RSA_2048"
)
assert params["KeyId"] == key["Arn"]
assert params["ImportToken"]
Expand All @@ -586,7 +587,12 @@ def test_import_key(self, kms_create_key, aws_client, snapshot):

# import symmetric key (key material) into KMS
public_key = load_der_public_key(params["PublicKey"])
encrypted_key = public_key.encrypt(symmetric_key, PKCS1v15())
encrypted_key = public_key.encrypt(
symmetric_key,
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA256()), algorithm=hashes.SHA256(), label=None
),
uubk marked this conversation as resolved.
Show resolved Hide resolved
)
describe_key_before_import = aws_client.kms.describe_key(KeyId=key_id)
snapshot.match("describe-key-before-import", describe_key_before_import)

Expand Down Expand Up @@ -621,6 +627,71 @@ def test_import_key(self, kms_create_key, aws_client, snapshot):
aws_client.kms.encrypt(Plaintext=plaintext, KeyId=key_id)
snapshot.match("encrypt-after-delete-error", e.value.response)

@markers.aws.validated
def test_import_key_asymmetric(self, kms_create_key, aws_client, snapshot):
snapshot.add_transformer(snapshot.transform.key_value("Description"))
key = kms_create_key(Origin="EXTERNAL", KeySpec="ECC_NIST_P256", KeyUsage="SIGN_VERIFY")
snapshot.match("created-key", key)
key_id = key["KeyId"]

crypto_key = ec.generate_private_key(ec.SECP256R1())
raw_private_key = crypto_key.private_bytes(
serialization.Encoding.DER,
serialization.PrivateFormat.PKCS8,
serialization.NoEncryption(),
)
raw_public_key = crypto_key.public_key().public_bytes(
serialization.Encoding.DER,
serialization.PublicFormat.SubjectPublicKeyInfo,
)
plaintext = b"test content 123 !#"

# get key import params
params = aws_client.kms.get_parameters_for_import(
KeyId=key_id, WrappingAlgorithm="RSAES_OAEP_SHA_256", WrappingKeySpec="RSA_2048"
)

# import asymmetric key (key material) into KMS
public_key = load_der_public_key(params["PublicKey"])
encrypted_key = public_key.encrypt(
raw_private_key,
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA256()), algorithm=hashes.SHA256(), label=None
),
)
describe_key_before_import = aws_client.kms.describe_key(KeyId=key_id)
snapshot.match("describe-key-before-import", describe_key_before_import)

aws_client.kms.import_key_material(
KeyId=key_id,
ImportToken=params["ImportToken"],
EncryptedKeyMaterial=encrypted_key,
ExpirationModel="KEY_MATERIAL_DOES_NOT_EXPIRE",
)
describe_key_after_import = aws_client.kms.describe_key(KeyId=key_id)
snapshot.match("describe-key-after-import", describe_key_after_import)

# Check whether public key is derived correctly
get_public_key_after_import = aws_client.kms.get_public_key(KeyId=key_id)
assert get_public_key_after_import["PublicKey"] == raw_public_key

# Do a sign/verify cycle
signed_data = aws_client.kms.sign(
Message=plaintext, MessageType="RAW", SigningAlgorithm="ECDSA_SHA_256", KeyId=key_id
)
verify_data = aws_client.kms.verify(
Message=plaintext,
Signature=signed_data["Signature"],
MessageType="RAW",
SigningAlgorithm="ECDSA_SHA_256",
KeyId=key_id,
)
assert verify_data["SignatureValid"]

aws_client.kms.delete_imported_key_material(KeyId=key_id)
describe_key_after_deleted_import = aws_client.kms.describe_key(KeyId=key_id)
snapshot.match("describe-key-after-deleted-import", describe_key_after_deleted_import)

@markers.aws.validated
def test_list_aliases_of_key(self, kms_create_key, kms_create_alias, aws_client):
aliased_key_id = kms_create_key()["KeyId"]
Expand Down
100 changes: 98 additions & 2 deletions tests/aws/services/kms/test_kms.snapshot.json
Expand Up @@ -1207,8 +1207,8 @@
}
}
},
"tests/aws/services/kms/test_kms.py::TestKMS::test_import_key": {
"recorded-date": "13-04-2023, 11:30:39",
"tests/aws/services/kms/test_kms.py::TestKMS::test_import_key_symmetric": {
"recorded-date": "24-01-2024, 10:44:12",
"recorded-content": {
"created-key": {
"AWSAccountId": "111111111111",
Expand Down Expand Up @@ -1335,6 +1335,102 @@
}
}
},
"tests/aws/services/kms/test_kms.py::TestKMS::test_import_key_asymmetric": {
"recorded-date": "24-01-2024, 10:44:14",
"recorded-content": {
"created-key": {
"AWSAccountId": "111111111111",
"Arn": "arn:aws:kms:<region>:111111111111:key/<key-id:1>",
"CreationDate": "datetime",
"CustomerMasterKeySpec": "ECC_NIST_P256",
"Description": "<description:1>",
"Enabled": false,
"KeyId": "<key-id:1>",
"KeyManager": "CUSTOMER",
"KeySpec": "ECC_NIST_P256",
"KeyState": "PendingImport",
"KeyUsage": "SIGN_VERIFY",
"MultiRegion": false,
"Origin": "EXTERNAL",
"SigningAlgorithms": [
"ECDSA_SHA_256"
]
},
"describe-key-before-import": {
"KeyMetadata": {
"AWSAccountId": "111111111111",
"Arn": "arn:aws:kms:<region>:111111111111:key/<key-id:1>",
"CreationDate": "datetime",
"CustomerMasterKeySpec": "ECC_NIST_P256",
"Description": "<description:1>",
"Enabled": false,
"KeyId": "<key-id:1>",
"KeyManager": "CUSTOMER",
"KeySpec": "ECC_NIST_P256",
"KeyState": "PendingImport",
"KeyUsage": "SIGN_VERIFY",
"MultiRegion": false,
"Origin": "EXTERNAL",
"SigningAlgorithms": [
"ECDSA_SHA_256"
]
},
"ResponseMetadata": {
"HTTPHeaders": {},
"HTTPStatusCode": 200
}
},
"describe-key-after-import": {
"KeyMetadata": {
"AWSAccountId": "111111111111",
"Arn": "arn:aws:kms:<region>:111111111111:key/<key-id:1>",
"CreationDate": "datetime",
"CustomerMasterKeySpec": "ECC_NIST_P256",
"Description": "<description:1>",
"Enabled": true,
"ExpirationModel": "KEY_MATERIAL_DOES_NOT_EXPIRE",
"KeyId": "<key-id:1>",
"KeyManager": "CUSTOMER",
"KeySpec": "ECC_NIST_P256",
"KeyState": "Enabled",
"KeyUsage": "SIGN_VERIFY",
"MultiRegion": false,
"Origin": "EXTERNAL",
"SigningAlgorithms": [
"ECDSA_SHA_256"
]
},
"ResponseMetadata": {
"HTTPHeaders": {},
"HTTPStatusCode": 200
}
},
"describe-key-after-deleted-import": {
"KeyMetadata": {
"AWSAccountId": "111111111111",
"Arn": "arn:aws:kms:<region>:111111111111:key/<key-id:1>",
"CreationDate": "datetime",
"CustomerMasterKeySpec": "ECC_NIST_P256",
"Description": "<description:1>",
"Enabled": false,
"KeyId": "<key-id:1>",
"KeyManager": "CUSTOMER",
"KeySpec": "ECC_NIST_P256",
"KeyState": "PendingImport",
"KeyUsage": "SIGN_VERIFY",
"MultiRegion": false,
"Origin": "EXTERNAL",
"SigningAlgorithms": [
"ECDSA_SHA_256"
]
},
"ResponseMetadata": {
"HTTPHeaders": {},
"HTTPStatusCode": 200
}
}
}
},
"tests/aws/services/kms/test_kms.py::TestKMS::test_error_messaging_for_invalid_keys": {
"recorded-date": "13-04-2023, 11:31:23",
"recorded-content": {
Expand Down