Skip to content

Commit

Permalink
fix deprecated secret versions preservation in secretsmanager (#10572)
Browse files Browse the repository at this point in the history
  • Loading branch information
macnev2013 committed Apr 9, 2024
1 parent 09c23de commit 307b190
Show file tree
Hide file tree
Showing 4 changed files with 372 additions and 17 deletions.
62 changes: 46 additions & 16 deletions localstack/services/secretsmanager/provider.py
Expand Up @@ -84,6 +84,9 @@
AWSPREVIOUS: Final[str] = "AWSPREVIOUS"
AWSPENDING: Final[str] = "AWSPENDING"
AWSCURRENT: Final[str] = "AWSCURRENT"
# The maximum number of outdated versions that can be stored in the secret.
# see: https://docs.aws.amazon.com/secretsmanager/latest/apireference/API_PutSecretValue.html
MAX_OUTDATED_SECRET_VERSIONS: Final[int] = 100
#
# Error Messages.
AWS_INVALID_REQUEST_MESSAGE_CREATE_WITH_SCHEDULED_DELETION: Final[str] = (
Expand Down Expand Up @@ -262,9 +265,10 @@ def list_secret_version_ids(
self, context: RequestContext, request: ListSecretVersionIdsRequest
) -> ListSecretVersionIdsResponse:
secret_id = request["SecretId"]
include_deprecated = request.get("IncludeDeprecated", False)
self._raise_if_invalid_secret_id(secret_id)
backend = SecretsmanagerProvider.get_moto_backend_for_resource(secret_id, context)
secrets = backend.list_secret_version_ids(secret_id)
secrets = backend.list_secret_version_ids(secret_id, include_deprecated=include_deprecated)
return ListSecretVersionIdsResponse(**json.loads(secrets))

@handler("PutResourcePolicy", expand=False)
Expand Down Expand Up @@ -480,7 +484,9 @@ def moto_smb_create_secret(fn, self, name, *args, **kwargs):


@patch(SecretsManagerBackend.list_secret_version_ids)
def moto_smb_list_secret_version_ids(_, self, secret_id, *args, **kwargs):
def moto_smb_list_secret_version_ids(
_, self, secret_id: str, include_deprecated: bool, *args, **kwargs
):
if secret_id not in self.secrets:
raise SecretNotFoundException()

Expand All @@ -496,18 +502,24 @@ def moto_smb_list_secret_version_ids(_, self, secret_id, *args, **kwargs):
versions: list[SecretVersionsListEntry] = list()
for version_id, version in secret.versions.items():
version_stages = version["version_stages"]
entry = SecretVersionsListEntry(
CreatedDate=version["createdate"],
VersionId=version_id,
VersionStages=version_stages,
)
# Patch: include deprecated versions if include_deprecated is True.
# version_stages is empty if the version is deprecated.
# see: https://docs.aws.amazon.com/secretsmanager/latest/userguide/getting-started.html#term_version
if len(version_stages) > 0 or include_deprecated:
entry = SecretVersionsListEntry(
CreatedDate=version["createdate"],
VersionId=version_id,
)

# Patch: bind LastAccessedDate if one exists for this version.
last_accessed_date = version.get("last_accessed_date")
if last_accessed_date:
entry["LastAccessedDate"] = last_accessed_date
if version_stages:
entry["VersionStages"] = version_stages

versions.append(entry)
# Patch: bind LastAccessedDate if one exists for this version.
last_accessed_date = version.get("last_accessed_date")
if last_accessed_date:
entry["LastAccessedDate"] = last_accessed_date

versions.append(entry)

# Patch: sort versions by date.
versions.sort(key=lambda v: v["CreatedDate"], reverse=True)
Expand Down Expand Up @@ -634,12 +646,20 @@ def backend_update_secret_version_stage(
def fake_secret_reset_default_version(fn, self, secret_version, version_id):
fn(self, secret_version, version_id)

# Remove versions with no version stages.
versions_no_stages = [
# Remove versions with no version stages, if max limit of outdated versions is exceeded.
versions_no_stages: list[str] = [
version_id for version_id, version in self.versions.items() if not version["version_stages"]
]
for version_no_stages in versions_no_stages:
del self.versions[version_no_stages]
versions_to_delete: list[str] = []

# Patch: remove outdated versions if the max deprecated versions limit is exceeded.
if len(versions_no_stages) >= MAX_OUTDATED_SECRET_VERSIONS:
versions_to_delete = versions_no_stages[
: len(versions_no_stages) - MAX_OUTDATED_SECRET_VERSIONS
]

for version_to_delete in versions_to_delete:
del self.versions[version_to_delete]


@patch(FakeSecret.remove_version_stages_from_old_versions)
Expand Down Expand Up @@ -802,6 +822,16 @@ def moto_secret_not_found_exception_init(fn, self):
self.code = 400


@patch(FakeSecret._form_version_ids_to_stages, pass_target=False)
def _form_version_ids_to_stages_modal(self):
version_id_to_stages: dict[str, list] = {}
for key, value in self.versions.items():
# Patch: include version_stages in the response only if it is not empty.
if len(value["version_stages"]) > 0:
version_id_to_stages[key] = value["version_stages"]
return version_id_to_stages


# patching resource policy in moto
def get_resource_policy_model(self, secret_id):
if self._is_valid_identifier(secret_id):
Expand Down
80 changes: 79 additions & 1 deletion tests/aws/services/secretsmanager/test_secretsmanager.py
Expand Up @@ -709,7 +709,6 @@ def test_update_secret_description(self, sm_snapshot, secret_name, aws_client):
sm_snapshot.match("describe_secret_res_0", describe_secret_res_0)

description_v1 = "MyDescription"
#
update_secret_res_0 = aws_client.secretsmanager.update_secret(
SecretId=secret_name, Description=description_v1
)
Expand Down Expand Up @@ -1037,6 +1036,85 @@ def test_update_secret_version_stages_current_pending_cycle_custom_stages_1(
)
sm_snapshot.match("delete_secret_res_0", delete_secret_res_0)

@markers.snapshot.skip_snapshot_verify(paths=["$..Versions..KmsKeyIds"])
@markers.aws.validated
def test_deprecated_secret_version_stage(
self, secret_name, create_secret, aws_client, sm_snapshot
):
response = create_secret(
Name=secret_name,
SecretString="original",
Description="My secret",
)
sm_snapshot.add_transformers_list(
sm_snapshot.transform.secretsmanager_secret_id_arn(response, 0)
)
sm_snapshot.match("create_secret", response)
self._wait_created_is_listed(aws_client.secretsmanager, secret_name)

response = aws_client.secretsmanager.list_secret_version_ids(SecretId=secret_name)
sm_snapshot.match("list_secret_version_ids", response)

response = aws_client.secretsmanager.put_secret_value(
SecretId=secret_name, SecretString="update1"
)
sm_snapshot.match("put_secret_value_1", response)

response = aws_client.secretsmanager.list_secret_version_ids(SecretId=secret_name)
sm_snapshot.match("list_secret_version_ids_1", response)

response = aws_client.secretsmanager.put_secret_value(
SecretId=secret_name, SecretString="update2"
)
sm_snapshot.match("put_secret_value_2", response)

response = aws_client.secretsmanager.list_secret_version_ids(SecretId=secret_name)
sm_snapshot.match("list_secret_version_ids_2", response)

response = aws_client.secretsmanager.list_secret_version_ids(
SecretId=secret_name, IncludeDeprecated=True
)
sm_snapshot.match("list_secret_version_ids_3", response)

response = aws_client.secretsmanager.put_secret_value(
SecretId=secret_name, SecretString="update3"
)
sm_snapshot.match("put_secret_value_3", response)

response = aws_client.secretsmanager.list_secret_version_ids(SecretId=secret_name)
sm_snapshot.match("list_secret_version_ids_4", response)

response = aws_client.secretsmanager.list_secret_version_ids(
SecretId=secret_name, IncludeDeprecated=True
)
sm_snapshot.match("list_secret_version_ids_5", response)

@markers.aws.only_localstack
def test_deprecated_secret_version(self, secret_name, create_secret, aws_client):
"""
This test ensures the version cleanup behavior in a simulated AWS environment.
Secrets Manager typically retains a maximum of 100 versions and does not
immediately delete versions created within the last 24 hours.
However, this test operates under the assumption that version timestamps are not evaluated,
and the cleanup process solely depends on reaching a version count threshold.
"""
create_secret(Name=secret_name, SecretString="original", Description="My secret")
self._wait_created_is_listed(aws_client.secretsmanager, secret_name)

for i in range(130):
aws_client.secretsmanager.put_secret_value(
SecretId=secret_name, SecretString=f"update{i}"
)
response = aws_client.secretsmanager.list_secret_version_ids(
SecretId=secret_name, IncludeDeprecated=True
)
# In Secrets Manager, versions of secrets without labels are considered deprecated.
# There will be two labeled versions:
# - The current version, labeled AWSCURRENT
# - The previous version, labeled AWSPREVIOUS
# see: https://docs.aws.amazon.com/secretsmanager/latest/userguide/getting-started.html#term_version
assert len(response["Versions"]) == 102

@markers.snapshot.skip_snapshot_verify(paths=["$..KmsKeyId", "$..KmsKeyIds"])
@markers.aws.validated
def test_update_secret_version_stages_current_pending_cycle_custom_stages_2(
Expand Down

0 comments on commit 307b190

Please sign in to comment.