4 fail, 304 skipped, 2 642 pass in 1h 36m 8s
Annotations
Check warning on line 0 in tests.aws.services.events.test_events.TestEvents
github-actions / LocalStack Community integration with Pro
test_api_destinations[auth0] (tests.aws.services.events.test_events.TestEvents) failed
pytest-junit-community-1.xml [took 9s]
Raw output
IndexError: list index out of range
self = <tests.aws.services.events.test_events.TestEvents object at 0x7f818b10cb10>
httpserver = <HTTPServer host=localhost port=34929>
auth = {'key': 'BasicAuthParameters', 'parameters': {'Password': 'pass', 'Username': 'user'}, 'type': 'BASIC'}
aws_client = <localstack.aws.connect.ServiceLevelClientFactory object at 0x7f8184356fd0>
clean_up = <function clean_up.<locals>._clean_up at 0x7f8142499800>
@markers.aws.unknown
@pytest.mark.parametrize("auth", API_DESTINATION_AUTHS)
@pytest.mark.skipif(is_v2_provider(), reason="V2 provider does not support this feature yet")
def test_api_destinations(self, httpserver: HTTPServer, auth, aws_client, clean_up):
token = short_uid()
bearer = f"Bearer {token}"
def _handler(_request: Request):
return Response(
json.dumps(
{
"access_token": token,
"token_type": "Bearer",
"expires_in": 86400,
}
),
mimetype="application/json",
)
httpserver.expect_request("").respond_with_handler(_handler)
http_endpoint = httpserver.url_for("/")
if auth.get("type") == "OAUTH_CLIENT_CREDENTIALS":
auth["parameters"]["AuthorizationEndpoint"] = http_endpoint
connection_name = f"c-{short_uid()}"
connection_arn = aws_client.events.create_connection(
Name=connection_name,
AuthorizationType=auth.get("type"),
AuthParameters={
auth.get("key"): auth.get("parameters"),
"InvocationHttpParameters": {
"BodyParameters": [
{
"Key": "connection_body_param",
"Value": "value",
"IsValueSecret": False,
},
],
"HeaderParameters": [
{
"Key": "connection-header-param",
"Value": "value",
"IsValueSecret": False,
},
{
"Key": "overwritten-header",
"Value": "original",
"IsValueSecret": False,
},
],
"QueryStringParameters": [
{
"Key": "connection_query_param",
"Value": "value",
"IsValueSecret": False,
},
{
"Key": "overwritten_query",
"Value": "original",
"IsValueSecret": False,
},
],
},
},
)["ConnectionArn"]
# create api destination
dest_name = f"d-{short_uid()}"
result = aws_client.events.create_api_destination(
Name=dest_name,
ConnectionArn=connection_arn,
InvocationEndpoint=http_endpoint,
HttpMethod="POST",
)
# create rule and target
rule_name = f"r-{short_uid()}"
target_id = f"target-{short_uid()}"
pattern = json.dumps({"source": ["source-123"], "detail-type": ["type-123"]})
aws_client.events.put_rule(Name=rule_name, EventPattern=pattern)
aws_client.events.put_targets(
Rule=rule_name,
Targets=[
{
"Id": target_id,
"Arn": result["ApiDestinationArn"],
"Input": '{"target_value":"value"}',
"HttpParameters": {
"PathParameterValues": ["target_path"],
"HeaderParameters": {
"target-header": "target_header_value",
"overwritten_header": "changed",
},
"QueryStringParameters": {
"target_query": "t_query",
"overwritten_query": "changed",
},
},
}
],
)
entries = [
{
"Source": "source-123",
"DetailType": "type-123",
"Detail": '{"i": 0}',
}
]
aws_client.events.put_events(Entries=entries)
# clean up
aws_client.events.delete_connection(Name=connection_name)
aws_client.events.delete_api_destination(Name=dest_name)
clean_up(rule_name=rule_name, target_ids=target_id)
to_recv = 2 if auth["type"] == "OAUTH_CLIENT_CREDENTIALS" else 1
poll_condition(lambda: len(httpserver.log) >= to_recv, timeout=5)
> event_request, _ = httpserver.log[-1]
E IndexError: list index out of range
../localstack/tests/aws/services/events/test_events.py:480: IndexError
Check warning on line 0 in tests.aws.services.events.test_events.TestEvents
github-actions / LocalStack Community integration with Pro
test_api_destinations[auth1] (tests.aws.services.events.test_events.TestEvents) failed
pytest-junit-community-1.xml [took 14s]
Raw output
IndexError: list index out of range
self = <tests.aws.services.events.test_events.TestEvents object at 0x7f818b10d210>
httpserver = <HTTPServer host=localhost port=34929>
auth = {'key': 'ApiKeyAuthParameters', 'parameters': {'ApiKeyName': 'Api', 'ApiKeyValue': 'apikey_secret'}, 'type': 'API_KEY'}
aws_client = <localstack.aws.connect.ServiceLevelClientFactory object at 0x7f8184356fd0>
clean_up = <function clean_up.<locals>._clean_up at 0x7f81093c2700>
@markers.aws.unknown
@pytest.mark.parametrize("auth", API_DESTINATION_AUTHS)
@pytest.mark.skipif(is_v2_provider(), reason="V2 provider does not support this feature yet")
def test_api_destinations(self, httpserver: HTTPServer, auth, aws_client, clean_up):
token = short_uid()
bearer = f"Bearer {token}"
def _handler(_request: Request):
return Response(
json.dumps(
{
"access_token": token,
"token_type": "Bearer",
"expires_in": 86400,
}
),
mimetype="application/json",
)
httpserver.expect_request("").respond_with_handler(_handler)
http_endpoint = httpserver.url_for("/")
if auth.get("type") == "OAUTH_CLIENT_CREDENTIALS":
auth["parameters"]["AuthorizationEndpoint"] = http_endpoint
connection_name = f"c-{short_uid()}"
connection_arn = aws_client.events.create_connection(
Name=connection_name,
AuthorizationType=auth.get("type"),
AuthParameters={
auth.get("key"): auth.get("parameters"),
"InvocationHttpParameters": {
"BodyParameters": [
{
"Key": "connection_body_param",
"Value": "value",
"IsValueSecret": False,
},
],
"HeaderParameters": [
{
"Key": "connection-header-param",
"Value": "value",
"IsValueSecret": False,
},
{
"Key": "overwritten-header",
"Value": "original",
"IsValueSecret": False,
},
],
"QueryStringParameters": [
{
"Key": "connection_query_param",
"Value": "value",
"IsValueSecret": False,
},
{
"Key": "overwritten_query",
"Value": "original",
"IsValueSecret": False,
},
],
},
},
)["ConnectionArn"]
# create api destination
dest_name = f"d-{short_uid()}"
result = aws_client.events.create_api_destination(
Name=dest_name,
ConnectionArn=connection_arn,
InvocationEndpoint=http_endpoint,
HttpMethod="POST",
)
# create rule and target
rule_name = f"r-{short_uid()}"
target_id = f"target-{short_uid()}"
pattern = json.dumps({"source": ["source-123"], "detail-type": ["type-123"]})
aws_client.events.put_rule(Name=rule_name, EventPattern=pattern)
aws_client.events.put_targets(
Rule=rule_name,
Targets=[
{
"Id": target_id,
"Arn": result["ApiDestinationArn"],
"Input": '{"target_value":"value"}',
"HttpParameters": {
"PathParameterValues": ["target_path"],
"HeaderParameters": {
"target-header": "target_header_value",
"overwritten_header": "changed",
},
"QueryStringParameters": {
"target_query": "t_query",
"overwritten_query": "changed",
},
},
}
],
)
entries = [
{
"Source": "source-123",
"DetailType": "type-123",
"Detail": '{"i": 0}',
}
]
aws_client.events.put_events(Entries=entries)
# clean up
aws_client.events.delete_connection(Name=connection_name)
aws_client.events.delete_api_destination(Name=dest_name)
clean_up(rule_name=rule_name, target_ids=target_id)
to_recv = 2 if auth["type"] == "OAUTH_CLIENT_CREDENTIALS" else 1
poll_condition(lambda: len(httpserver.log) >= to_recv, timeout=5)
> event_request, _ = httpserver.log[-1]
E IndexError: list index out of range
../localstack/tests/aws/services/events/test_events.py:480: IndexError
Check warning on line 0 in tests.aws.services.events.test_events.TestEvents
github-actions / LocalStack Community integration with Pro
test_api_destinations[auth2] (tests.aws.services.events.test_events.TestEvents) failed
pytest-junit-community-1.xml [took 13s]
Raw output
KeyError: 'connection_body_param'
self = <tests.aws.services.events.test_events.TestEvents object at 0x7f818b10c4d0>
httpserver = <HTTPServer host=localhost port=34929>
auth = {'key': 'OAuthParameters', 'parameters': {'AuthorizationEndpoint': 'http://localhost:34929/', 'ClientParameters': {'Cl... 'value2'}], 'QueryStringParameters': [{'Key': 'oauthquery', 'Value': 'value3'}]}}, 'type': 'OAUTH_CLIENT_CREDENTIALS'}
aws_client = <localstack.aws.connect.ServiceLevelClientFactory object at 0x7f8184356fd0>
clean_up = <function clean_up.<locals>._clean_up at 0x7f814018f6a0>
@markers.aws.unknown
@pytest.mark.parametrize("auth", API_DESTINATION_AUTHS)
@pytest.mark.skipif(is_v2_provider(), reason="V2 provider does not support this feature yet")
def test_api_destinations(self, httpserver: HTTPServer, auth, aws_client, clean_up):
token = short_uid()
bearer = f"Bearer {token}"
def _handler(_request: Request):
return Response(
json.dumps(
{
"access_token": token,
"token_type": "Bearer",
"expires_in": 86400,
}
),
mimetype="application/json",
)
httpserver.expect_request("").respond_with_handler(_handler)
http_endpoint = httpserver.url_for("/")
if auth.get("type") == "OAUTH_CLIENT_CREDENTIALS":
auth["parameters"]["AuthorizationEndpoint"] = http_endpoint
connection_name = f"c-{short_uid()}"
connection_arn = aws_client.events.create_connection(
Name=connection_name,
AuthorizationType=auth.get("type"),
AuthParameters={
auth.get("key"): auth.get("parameters"),
"InvocationHttpParameters": {
"BodyParameters": [
{
"Key": "connection_body_param",
"Value": "value",
"IsValueSecret": False,
},
],
"HeaderParameters": [
{
"Key": "connection-header-param",
"Value": "value",
"IsValueSecret": False,
},
{
"Key": "overwritten-header",
"Value": "original",
"IsValueSecret": False,
},
],
"QueryStringParameters": [
{
"Key": "connection_query_param",
"Value": "value",
"IsValueSecret": False,
},
{
"Key": "overwritten_query",
"Value": "original",
"IsValueSecret": False,
},
],
},
},
)["ConnectionArn"]
# create api destination
dest_name = f"d-{short_uid()}"
result = aws_client.events.create_api_destination(
Name=dest_name,
ConnectionArn=connection_arn,
InvocationEndpoint=http_endpoint,
HttpMethod="POST",
)
# create rule and target
rule_name = f"r-{short_uid()}"
target_id = f"target-{short_uid()}"
pattern = json.dumps({"source": ["source-123"], "detail-type": ["type-123"]})
aws_client.events.put_rule(Name=rule_name, EventPattern=pattern)
aws_client.events.put_targets(
Rule=rule_name,
Targets=[
{
"Id": target_id,
"Arn": result["ApiDestinationArn"],
"Input": '{"target_value":"value"}',
"HttpParameters": {
"PathParameterValues": ["target_path"],
"HeaderParameters": {
"target-header": "target_header_value",
"overwritten_header": "changed",
},
"QueryStringParameters": {
"target_query": "t_query",
"overwritten_query": "changed",
},
},
}
],
)
entries = [
{
"Source": "source-123",
"DetailType": "type-123",
"Detail": '{"i": 0}',
}
]
aws_client.events.put_events(Entries=entries)
# clean up
aws_client.events.delete_connection(Name=connection_name)
aws_client.events.delete_api_destination(Name=dest_name)
clean_up(rule_name=rule_name, target_ids=target_id)
to_recv = 2 if auth["type"] == "OAUTH_CLIENT_CREDENTIALS" else 1
poll_condition(lambda: len(httpserver.log) >= to_recv, timeout=5)
event_request, _ = httpserver.log[-1]
event = event_request.get_json(force=True)
headers = event_request.headers
query_args = event_request.args
# Connection data validation
> assert event["connection_body_param"] == "value"
E KeyError: 'connection_body_param'
../localstack/tests/aws/services/events/test_events.py:486: KeyError
Check warning on line 0 in tests.aws.services.sns.test_sns.TestSNSSubscriptionCrud
github-actions / LocalStack Community integration with Pro
test_list_subscriptions_by_topic_pagination (tests.aws.services.sns.test_sns.TestSNSSubscriptionCrud) failed
pytest-junit-community-2.xml [took 1s]
Raw output
AssertionError: assert False
+ where False = <built-in method endswith of str object at 0x7f23eb8e76f0>('==')
+ where <built-in method endswith of str object at 0x7f23eb8e76f0> = 'YXJuOmF3czpzbnM6YXAtbm9ydGhlYXN0LTE6MDAwMDAwMDAwMDAxOnRlc3QtdG9waWMtMWRkNGZlMTU6MjBlODNiZGItOTZkYi00YmJiLWI4NjEtYzMyNjAwY2ExNGI5'.endswith
self = <tests.aws.services.sns.test_sns.TestSNSSubscriptionCrud object at 0x7f24a81f17d0>
sns_create_topic = <function sns_create_topic.<locals>._create_topic at 0x7f24817c6c00>
sns_subscription = <function sns_subscription.<locals>._create_sub at 0x7f24817c60c0>
snapshot = <localstack_snapshot.snapshots.prototype.SnapshotSession object at 0x7f2417ddd150>
aws_client = <localstack.aws.connect.ServiceLevelClientFactory object at 0x7f24a0e63a90>
@markers.aws.validated
def test_list_subscriptions_by_topic_pagination(
self, sns_create_topic, sns_subscription, snapshot, aws_client
):
# ordering of the listing seems to be not consistent, so we can transform its value
snapshot.add_transformers_list(
[
snapshot.transform.key_value("Endpoint"),
snapshot.transform.key_value("NextToken"),
]
)
base_phone_number = "+12312312"
topic_arn = sns_create_topic()["TopicArn"]
for phone_suffix in range(101):
phone_number = f"{base_phone_number}{phone_suffix}"
sns_subscription(TopicArn=topic_arn, Protocol="sms", Endpoint=phone_number)
response = aws_client.sns.list_subscriptions_by_topic(TopicArn=topic_arn)
# not snapshotting the results, it contains 100 entries
assert "NextToken" in response
# seems to be b64 encoded
> assert response["NextToken"].endswith("==")
E AssertionError: assert False
E + where False = <built-in method endswith of str object at 0x7f23eb8e76f0>('==')
E + where <built-in method endswith of str object at 0x7f23eb8e76f0> = 'YXJuOmF3czpzbnM6YXAtbm9ydGhlYXN0LTE6MDAwMDAwMDAwMDAxOnRlc3QtdG9waWMtMWRkNGZlMTU6MjBlODNiZGItOTZkYi00YmJiLWI4NjEtYzMyNjAwY2ExNGI5'.endswith
../localstack/tests/aws/services/sns/test_sns.py:846: AssertionError