Skip to content
This repository was archived by the owner on Mar 23, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 42 additions & 72 deletions localstack/testing/pytest/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import time
from typing import Any, Callable, Dict, List, Optional, Tuple

import boto3
import botocore.auth
import botocore.config
import botocore.credentials
Expand All @@ -23,7 +22,6 @@
from werkzeug import Request, Response

from localstack import config, constants
from localstack.aws.accounts import get_aws_account_id
from localstack.constants import TEST_AWS_ACCESS_KEY_ID, TEST_AWS_SECRET_ACCESS_KEY
from localstack.services.stores import (
AccountRegionBundle,
Expand All @@ -35,7 +33,6 @@
from localstack.testing.aws.cloudformation_utils import load_template_file, render_template
from localstack.testing.aws.util import get_lambda_logs, is_aws_cloud
from localstack.utils import testutil
from localstack.utils.aws import aws_stack
from localstack.utils.aws.client import SigningHttpClient
from localstack.utils.aws.resources import create_dynamodb_table
from localstack.utils.collections import ensure_list
Expand All @@ -53,47 +50,8 @@
PUBLIC_HTTP_ECHO_SERVER_URL = "http://httpbin.org"


def _resource(service):
if os.environ.get("TEST_TARGET") == "AWS_CLOUD":
return boto3.resource(service)
# can't set the timeouts to 0 like in the AWS CLI because the underlying http client requires values > 0
config = (
botocore.config.Config(
connect_timeout=1_000, read_timeout=1_000, retries={"total_max_attempts": 1}
)
if os.environ.get("TEST_DISABLE_RETRIES_AND_TIMEOUTS")
else None
)
return aws_stack.connect_to_resource_external(service, config=config)


# TODO: remove usage of this fixture
@pytest.fixture(scope="class")
def create_boto_client(aws_client_factory):
def _factory_client(
service, region_name=None, aws_access_key_id=None, *, additional_config=None
):
return aws_client_factory.get_client(
service_name=service, region_name=region_name, config=additional_config
)

return _factory_client


@pytest.fixture(scope="class")
def boto3_session():
if os.environ.get("TEST_TARGET", "") == "AWS_CLOUD":
return boto3.Session()

return boto3.Session(
# LocalStack assumes AWS_ACCESS_KEY_ID config contains the AWS_ACCOUNT_ID value.
aws_access_key_id=get_aws_account_id(),
aws_secret_access_key="__test_key__",
)


@pytest.fixture(scope="class")
def aws_http_client_factory(boto3_session):
def aws_http_client_factory(aws_session):
"""
Returns a factory for creating new ``SigningHttpClient`` instances using a configurable botocore request signer.
The default signer is a SigV4QueryAuth. The credentials are extracted from the ``boto3_sessions`` fixture that
Expand All @@ -120,24 +78,22 @@ def factory(
aws_access_key_id: str = None,
aws_secret_access_key: str = None,
):
region = region or boto3_session.region_name
region = region or aws_session.region_name
region = region or config.DEFAULT_REGION

if aws_access_key_id or aws_secret_access_key:
credentials = botocore.credentials.Credentials(
access_key=aws_access_key_id, secret_key=aws_secret_access_key
)
else:
credentials = boto3_session.get_credentials()
credentials = aws_session.get_credentials()

creds = credentials.get_frozen_credentials()

if not endpoint_url:
if os.environ.get("TEST_TARGET", "") == "AWS_CLOUD":
# FIXME: this is a bit raw. we should probably re-use boto in a better way
resolver: EndpointResolver = boto3_session._session.get_component(
"endpoint_resolver"
)
resolver: EndpointResolver = aws_session._session.get_component("endpoint_resolver")
endpoint_url = "https://" + resolver.construct_endpoint(service, region)["hostname"]
else:
endpoint_url = config.get_edge_url()
Expand All @@ -147,11 +103,6 @@ def factory(
return factory


@pytest.fixture(scope="class")
def dynamodb_resource():
return _resource("dynamodb")


@pytest.fixture(scope="class")
def s3_vhost_client(aws_client_factory):
return aws_client_factory(config=botocore.config.Config(s3={"addressing_style": "virtual"})).s3
Expand All @@ -165,11 +116,6 @@ def s3_presigned_client(aws_client_factory):
).s3


@pytest.fixture(scope="class")
def s3_resource():
return _resource("s3")


@pytest.fixture
def dynamodb_wait_for_table_active(aws_client):
def wait_for_table_active(table_name: str, client=None):
Expand Down Expand Up @@ -237,7 +183,7 @@ def factory(**kwargs):


@pytest.fixture
def s3_create_bucket(s3_resource, aws_client):
def s3_create_bucket(s3_empty_bucket, aws_client):
buckets = []

def factory(**kwargs) -> str:
Expand All @@ -261,10 +207,8 @@ def factory(**kwargs) -> str:
# cleanup
for bucket in buckets:
try:
bucket = s3_resource.Bucket(bucket)
bucket.objects.all().delete()
bucket.object_versions.all().delete()
bucket.delete()
s3_empty_bucket(bucket)
aws_client.s3.delete_bucket(Bucket=bucket)
except Exception as e:
LOG.debug("error cleaning up bucket %s: %s", bucket, e)

Expand All @@ -278,6 +222,29 @@ def s3_bucket(s3_create_bucket, aws_client) -> str:
return s3_create_bucket(**kwargs)


@pytest.fixture
def s3_empty_bucket(aws_client):
"""
Returns a factory that given a bucket name, deletes all objects and deletes all object versions
"""
# Boto resource would make this a straightforward task, but our internal client does not support Boto resource
# FIXME: this won't work when bucket has more than 1000 objects
def factory(bucket_name: str):
response = aws_client.s3.list_objects_v2(Bucket=bucket_name)
objects = [{"Key": obj["Key"]} for obj in response["Contents"]]
aws_client.s3.delete_objects(Bucket=bucket_name, Delete={"Objects": objects})

response = aws_client.s3.list_object_versions(Bucket=bucket_name)
object_versions = [
{"Key": obj["Key"], "VersionId": obj["VersionId"]}
for obj in response.get("Versions", [])
]
if object_versions:
aws_client.s3.delete_objects(Bucket=bucket_name, Delete={"Objects": object_versions})

yield factory
Comment on lines +225 to +245
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This new fixture replaces the functionality offered by the Boto S3 Resource

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd probably prefer this to be a normal utility over a fixture but should be ok to leave as-is for now 👍



@pytest.fixture
def sqs_create_queue(aws_client):
queue_urls = []
Expand Down Expand Up @@ -693,22 +660,25 @@ def is_stream_ready():


@pytest.fixture()
def kms_create_key(create_boto_client):
def kms_create_key(aws_client_factory):
key_ids = []

def _create_key(region=None, **kwargs):
def _create_key(region_name: str = None, **kwargs):
if "Description" not in kwargs:
kwargs["Description"] = f"test description - {short_uid()}"
key_metadata = create_boto_client("kms", region).create_key(**kwargs)["KeyMetadata"]
key_ids.append((region, key_metadata["KeyId"]))
key_metadata = aws_client_factory(region_name=region_name).kms.create_key(**kwargs)[
"KeyMetadata"
]
key_ids.append((region_name, key_metadata["KeyId"]))
return key_metadata

yield _create_key

for region, key_id in key_ids:
for region_name, key_id in key_ids:
try:

# shortest amount of time you can schedule the deletion
create_boto_client("kms", region).schedule_key_deletion(
aws_client_factory(region_name=region_name).kms.schedule_key_deletion(
KeyId=key_id, PendingWindowInDays=7
)
except Exception as e:
Expand All @@ -722,20 +692,20 @@ def _create_key(region=None, **kwargs):


@pytest.fixture()
def kms_replicate_key(create_boto_client):
def kms_replicate_key(aws_client_factory):
key_ids = []

def _replicate_key(region_from=None, **kwargs):
region_to = kwargs.get("ReplicaRegion")
key_ids.append((region_to, kwargs.get("KeyId")))
return create_boto_client("kms", region_from).replicate_key(**kwargs)
return aws_client_factory(region_name=region_from).kms.replicate_key(**kwargs)

yield _replicate_key

for region_to, key_id in key_ids:
try:
# shortest amount of time you can schedule the deletion
create_boto_client("kms", region_to).schedule_key_deletion(
aws_client_factory(region_name=region_to).kms.schedule_key_deletion(
KeyId=key_id, PendingWindowInDays=7
)
except Exception as e:
Expand Down
26 changes: 26 additions & 0 deletions localstack/testing/snapshots/transformer_utility.py
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,32 @@ def s3_notifications_transformer():
),
]

@staticmethod
def s3_dynamodb_notifications():
return [
TransformerUtility.jsonpath("$..uuid.S", "uuid"),
TransformerUtility.jsonpath("$..M.requestParameters.M.sourceIPAddress.S", "ip-address"),
TransformerUtility.jsonpath(
"$..M.responseElements.M.x-amz-id-2.S", "amz-id", reference_replacement=False
),
TransformerUtility.jsonpath(
"$..M.responseElements.M.x-amz-request-id.S",
"amz-request-id",
reference_replacement=False,
),
TransformerUtility.jsonpath("$..M.s3.M.bucket.M.name.S", "bucket-name"),
TransformerUtility.jsonpath("$..M.s3.M.bucket.M.arn.S", "bucket-arn"),
TransformerUtility.jsonpath(
"$..M.s3.M.bucket.M.ownerIdentity.M.principalId.S", "principal-id"
),
TransformerUtility.jsonpath("$..M.s3.M.configurationId.S", "config-id"),
TransformerUtility.jsonpath("$..M.s3.M.object.M.key.S", "object-key"),
TransformerUtility.jsonpath(
"$..M.s3.M.object.M.sequencer.S", "sequencer", reference_replacement=False
),
TransformerUtility.jsonpath("$..M.userIdentity.M.principalId.S", "principal-id"),
]

@staticmethod
def kinesis_api():
"""
Expand Down
10 changes: 10 additions & 0 deletions localstack/utils/aws/aws_stack.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import re
import socket
import threading
import warnings
from functools import lru_cache
from typing import Dict, Optional, Union

Expand Down Expand Up @@ -226,6 +227,10 @@ def connect_to_resource(
"""
Generic method to obtain an AWS service resource using boto3, based on environment, region, or custom endpoint_url.
"""
warnings.warn(
"`connect_to_resource` is obsolete. Use `localstack.aws.connect`", DeprecationWarning
)

return connect_to_service(
service_name,
client=False,
Expand All @@ -248,6 +253,11 @@ def connect_to_resource_external(
"""
Generic method to obtain an AWS service resource using boto3, based on environment, region, or custom endpoint_url.
"""
warnings.warn(
"`connect_to_resource_external` is obsolete. Use `localstack.aws.connect`",
DeprecationWarning,
)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With this PR, these utilities are no longer used in the codebase but I've added these warnings so that they're properly highlighted in the IDE.

Btw, Python 3.12 will have a decorator to do exactly this https://peps.python.org/pep-0702/

return create_external_boto_client(
service_name,
client=False,
Expand Down
1 change: 1 addition & 0 deletions localstack/utils/aws/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ def create_s3_bucket(bucket_name: str, s3_client=None):
return s3_client.create_bucket(Bucket=bucket_name, **kwargs)


# TODO: convert this into a fixture?
def create_dynamodb_table(
table_name: str,
partition_key: str,
Expand Down
8 changes: 3 additions & 5 deletions tests/integration/apigateway/test_apigateway_dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
def test_rest_api_to_dynamodb_integration(
ddb_action,
dynamodb_create_table,
dynamodb_resource,
create_rest_api_with_integration,
snapshot,
aws_client,
Expand All @@ -45,10 +44,9 @@ def test_rest_api_to_dynamodb_integration(
table_name = table["TableName"]

# insert items
dynamodb_table = dynamodb_resource.Table(table_name)
item_ids = ("test", "test2", "test 3")
for item_id in item_ids:
dynamodb_table.put_item(Item={"id": item_id})
aws_client.dynamodb.put_item(TableName=table_name, Item={"id": {"S": item_id}})
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The client needs the items to be typed. The Boto resource had an intrinsic type conversion so it wasn't necessary earlier


# construct request mapping template
if ddb_action == "PutItem":
Expand Down Expand Up @@ -97,8 +95,8 @@ def _invoke_with_retries(id_param=None):
if ddb_action == "PutItem":
result = _invoke_with_retries("test-new")
snapshot.match("result-put-item", result)
result = dynamodb_table.scan()
result["Items"] = sorted(result["Items"], key=lambda x: x["id"])
result = aws_client.dynamodb.scan(TableName=table_name)
result["Items"] = sorted(result["Items"], key=lambda x: x["id"]["S"])
snapshot.match("result-scan", result)

elif ddb_action == "Query":
Expand Down
Loading