Skip to content

Commit

Permalink
[aws][fix] Fix SQS handling (#1382)
Browse files Browse the repository at this point in the history
* [aws][fix] Fix SQS handling

* correct import
  • Loading branch information
aquamatthias committed Jan 27, 2023
1 parent 2b6f2c7 commit cedab1d
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 39 deletions.
51 changes: 31 additions & 20 deletions plugins/aws/resoto_plugin_aws/resource/sqs.py
Expand Up @@ -7,11 +7,22 @@
from resoto_plugin_aws.resource.base import AwsApiSpec, AwsResource, GraphBuilder
from resoto_plugin_aws.resource.kms import AwsKmsKey
from resotolib.baseresources import ModelReference
from resotolib.json_bender import F, Bender, S
from resotolib.json_bender import F, Bender, S, AsInt, AsBool, Bend, ParseJson
from resotolib.types import Json
from resotolib.utils import utc_str


@define(eq=False, slots=False)
class AwsSqsRedrivePolicy:
kind: ClassVar[str] = "aws_sqs_redrive_policy"
mapping: ClassVar[Dict[str, Bender]] = {
"dead_letter_target_arn": S("deadLetterTargetArn"),
"max_receive_count": S("maxReceiveCount"),
}
dead_letter_target_arn: Optional[str] = None
max_receive_count: Optional[int] = None


@define(eq=False, slots=False)
class AwsSqsQueue(AwsResource):
kind: ClassVar[str] = "aws_sqs_queue"
Expand All @@ -23,35 +34,35 @@ class AwsSqsQueue(AwsResource):
mapping: ClassVar[Dict[str, Bender]] = {
"id": S("QueueName"),
"name": S("QueueName"),
"ctime": S("CreatedTimestamp") >> F(lambda x: utc_str(datetime.utcfromtimestamp(x))),
"mtime": S("LastModifiedTimestamp") >> F(lambda x: utc_str(datetime.utcfromtimestamp(x))),
"ctime": S("CreatedTimestamp") >> AsInt() >> F(lambda x: utc_str(datetime.utcfromtimestamp(x))),
"mtime": S("LastModifiedTimestamp") >> AsInt() >> F(lambda x: utc_str(datetime.utcfromtimestamp(x))),
"arn": S("QueueArn"),
"sqs_queue_url": S("QueueUrl"),
"sqs_approximate_number_of_messages": S("ApproximateNumberOfMessages"),
"sqs_approximate_number_of_messages_not_visible": S("ApproximateNumberOfMessagesNotVisible"),
"sqs_approximate_number_of_messages_delayed": S("ApproximateNumberOfMessagesDelayed"),
"sqs_policy": S("Policy"),
"sqs_redrive_policy": S("RedrivePolicy"),
"sqs_approximate_number_of_messages": S("ApproximateNumberOfMessages") >> AsInt(),
"sqs_approximate_number_of_messages_not_visible": S("ApproximateNumberOfMessagesNotVisible") >> AsInt(),
"sqs_approximate_number_of_messages_delayed": S("ApproximateNumberOfMessagesDelayed") >> AsInt(),
"sqs_policy": S("Policy") >> ParseJson(keys_to_snake=True),
"sqs_redrive_policy": S("RedrivePolicy") >> ParseJson() >> Bend(AwsSqsRedrivePolicy.mapping),
"sqs_fifo_queue": S("FifoQueue"),
"sqs_content_based_deduplication": S("ContentBasedDeduplication"),
"sqs_content_based_deduplication": S("ContentBasedDeduplication") >> AsBool(),
"sqs_kms_master_key_id": S("KmsMasterKeyId"),
"sqs_kms_data_key_reuse_period_seconds": S("KmsDataKeyReusePeriodSeconds"),
"sqs_kms_data_key_reuse_period_seconds": S("KmsDataKeyReusePeriodSeconds") >> AsInt(),
"sqs_deduplication_scope": S("DeduplicationScope"),
"sqs_fifo_throughput_limit": S("FifoThroughputLimit"),
"sqs_redrive_allow_policy": S("RedriveAllowPolicy"),
"sqs_visibility_timeout": S("VisibilityTimeout"),
"sqs_maximum_message_size": S("MaximumMessageSize"),
"sqs_message_retention_period": S("MessageRetentionPeriod"),
"sqs_delay_seconds": S("DelaySeconds"),
"sqs_receive_message_wait_time_seconds": S("ReceiveMessageWaitTimeSeconds"),
"sqs_managed_sse_enabled": S("SqsManagedSseEnabled"),
"sqs_redrive_allow_policy": S("RedriveAllowPolicy") >> ParseJson() >> S("redrivePermission"),
"sqs_visibility_timeout": S("VisibilityTimeout") >> AsInt(),
"sqs_maximum_message_size": S("MaximumMessageSize") >> AsInt(),
"sqs_message_retention_period": S("MessageRetentionPeriod") >> AsInt(),
"sqs_delay_seconds": S("DelaySeconds") >> AsInt(),
"sqs_receive_message_wait_time_seconds": S("ReceiveMessageWaitTimeSeconds") >> AsInt(),
"sqs_managed_sse_enabled": S("SqsManagedSseEnabled") >> AsBool(),
}
sqs_queue_url: Optional[str] = field(default=None)
sqs_approximate_number_of_messages: Optional[int] = field(default=None)
sqs_approximate_number_of_messages_not_visible: Optional[int] = field(default=None)
sqs_approximate_number_of_messages_delayed: Optional[int] = field(default=None)
sqs_policy: Optional[str] = field(default=None)
sqs_redrive_policy: Optional[str] = field(default=None)
sqs_policy: Optional[Json] = field(default=None)
sqs_redrive_policy: Optional[AwsSqsRedrivePolicy] = field(default=None)
sqs_fifo_queue: Optional[bool] = field(default=None)
sqs_content_based_deduplication: Optional[bool] = field(default=None)
sqs_kms_master_key_id: Optional[str] = field(default=None)
Expand Down Expand Up @@ -84,7 +95,7 @@ def add_instance(queue_url: str) -> None:
builder.submit_work(add_tags, instance)

def add_tags(queue: AwsSqsQueue) -> None:
tags = builder.client.get("sqs", "list-queue-tags", result_name="Tags", QueueUrl=[queue.sqs_queue_url])
tags = builder.client.get("sqs", "list-queue-tags", result_name="Tags", QueueUrl=queue.sqs_queue_url)
if tags:
queue.tags = tags

Expand Down
Expand Up @@ -2,25 +2,27 @@
"Attributes":
{
"QueueArn": "arn:aws:sqs:eu-central-1:882347060974:SomeName",
"ApproximateNumberOfMessages": 0,
"ApproximateNumberOfMessagesNotVisible": 0,
"ApproximateNumberOfMessagesDelayed": 0,
"CreatedTimestamp": 1659601278,
"LastModifiedTimestamp": 1659601278,
"Policy": "SomeSQSPolicy",
"RedrivePolicy": "SomeRedrivePolicy",
"FifoQueue": true,
"ContentBasedDeduplication": false,
"ApproximateNumberOfMessages": "0",
"ApproximateNumberOfMessagesNotVisible": "0",
"ApproximateNumberOfMessagesDelayed": "0",
"CreatedTimestamp": "1659601278",
"LastModifiedTimestamp": "1659601278",
"Policy": "{\"Version\":\"2008-10-17\",\"Id\":\"__default_policy_ID\",\"Statement\":[{\"Sid\":\"__owner_statement\",\"Effect\":\"Allow\",\"Principal\":{\"AWS\":\"arn:aws:iam::882347060974:root\"},\"Action\":\"SQS:*\",\"Resource\":\"arn:aws:sqs:eu-central-1:882347060974:DeleteMeQueue\"}]}",
"RedrivePolicy": "{ \"deadLetterTargetArn\": \"arn:aws:sqs:eu-central-1:882347060974:SomeName\", \"maxReceiveCount\": 42 }",
"FifoQueue": "true",
"ContentBasedDeduplication": "false",
"KmsMasterKeyId": "SomeKeyId",
"KmsDataKeyReusePeriodSeconds": 42,
"KmsDataKeyReusePeriodSeconds": "42",
"DeduplicationScope": "messageGroup",
"FifoThroughputLimit": "perQueue",
"RedriveAllowPolicy": "SomeRedriveAllowPolicy",
"VisibilityTimeout": 30,
"MaximumMessageSize": 262144,
"MessageRetentionPeriod": 345600,
"DelaySeconds": 0,
"ReceiveMessageWaitTimeSeconds": 0,
"SqsManagedSseEnabled": false
"RedriveAllowPolicy": "{\"redrivePermission\":\"allowAll\"}",
"VisibilityTimeout": "30",
"MaximumMessageSize": "262144",
"MessageRetentionPeriod": "345600",
"DelaySeconds": "0",
"ReceiveMessageWaitTimeSeconds": "0",
"SqsManagedSseEnabled": "false"
}
}
}


51 changes: 50 additions & 1 deletion resotolib/resotolib/json_bender.py
@@ -1,11 +1,15 @@
from __future__ import annotations

import json
import logging
from abc import ABC
from datetime import datetime
from enum import Enum
from typing import Dict, Any, Type, Union, Optional, Callable

from resotolib.types import Json
from jsons import snakecase

from resotolib.types import Json, JsonElement
from resotolib.units import parse

log = logging.getLogger("resoto." + __name__)
Expand Down Expand Up @@ -329,6 +333,51 @@ def execute(self, source: Any) -> Any:
return source


class AsInt(Bender):
def execute(self, source: Any) -> Any:
if isinstance(source, int):
return source
else:
try:
return int(source)
except Exception:
return None


class AsBool(Bender):
def execute(self, source: Any) -> Any:
if isinstance(source, bool):
return source
elif isinstance(source, str):
return source.lower() in ("true", "yes", "1")
else:
return bool(source)


class ParseJson(Bender):
def __init__(self, keys_to_snake: bool = False, **kwargs: Any):
super().__init__(**kwargs)
self._keys_to_snake = keys_to_snake

def execute(self, source: Any) -> Any:
def reformat_keys_to_snake(js: JsonElement) -> JsonElement:
if isinstance(js, dict):
return {snakecase(k): reformat_keys_to_snake(v) for k, v in js.items()}
elif isinstance(js, list):
return [reformat_keys_to_snake(v) for v in js]
else:
return js

if isinstance(source, str):
try:
result = json.loads(source)
return result if not self._keys_to_snake else reformat_keys_to_snake(result)
except Exception:
return None
else:
return None


class AsDate(Bender):
"""
Parse a given input string as date.
Expand Down

0 comments on commit cedab1d

Please sign in to comment.