Skip to content

Commit 0a1116d

Browse files
1101-1aquamatthias
andauthored
[aws][fix] Fix parallel collection (#2071)
Co-authored-by: Matthias Veit <matthias_veit@yahoo.de> Co-authored-by: Matthias Veit <aquamatthias@users.noreply.github.com>
1 parent e46e21f commit 0a1116d

40 files changed

+1688
-2259
lines changed

fixlib/fixlib/baseresources.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -148,16 +148,18 @@ def __str__(self) -> str:
148148

149149
# load balancers
150150
RequestCount = "request" # _count will be added to the end because of the unit
151-
ActiveConnection = "active_connection"
151+
ActiveConnectionCount = "active_connection" # _count will be added to the end because of the unit
152+
ALBActiveConnectionCount = "alb_active_connection" # _count will be added to the end because of the unit
152153
ConnectionAttemptCount = "connection_attempt" # _count will be added to the end because of the unit
153154
ConnectionEstablishedCount = "connection_established" # _count will be added to the end because of the unit
154155
StatusCode2XX = "status_code_2xx"
155156
StatusCode4XX = "status_code_4xx"
156157
StatusCode5XX = "status_code_5xx"
157158
Latency = "latency"
159+
TargetResponseTime = "target_response_time"
158160
ProcessedBytes = "processed" # _bytes will be added to the end because of the unit
159161
HealthyHostCount = "healthy_host" # _count will be added to the end because of the unit
160-
UnhealthyHostCount = "unhealthy_host" # _count will be added to the end because of the unit
162+
UnHealthyHostCount = "unhealthy_host" # _count will be added to the end because of the unit
161163
HealthyStateRouting = "healthy_state_routing"
162164
UnhealthyStateRouting = "unhealthy_state_routing"
163165
HealthyStateDNS = "healthy_state_dns"

plugins/aws/fix_plugin_aws/collector.py

Lines changed: 51 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
1+
from collections import defaultdict
12
import logging
2-
from attrs import define
33
from concurrent.futures import Future, ThreadPoolExecutor
4-
from typing import List, Type, Optional, ClassVar, Union
5-
from datetime import datetime, timezone
4+
from datetime import datetime, timedelta, timezone
5+
from typing import List, Type, Optional, ClassVar, Union, cast
6+
7+
from attrs import define
68

79
from fix_plugin_aws.aws_client import AwsClient
810
from fix_plugin_aws.configuration import AwsConfig
@@ -46,16 +48,14 @@
4648
waf,
4749
)
4850
from fix_plugin_aws.resource.base import AwsAccount, AwsApiSpec, AwsRegion, AwsResource, GraphBuilder
49-
5051
from fixlib.baseresources import Cloud, EdgeType, BaseOrganizationalRoot, BaseOrganizationalUnit
5152
from fixlib.core.actions import CoreFeedback, ErrorAccumulator
5253
from fixlib.core.progress import ProgressDone, ProgressTree
5354
from fixlib.graph import Graph, BySearchCriteria, ByNodeId
55+
from fixlib.json import value_in_path
5456
from fixlib.proc import set_thread_name
5557
from fixlib.threading import ExecutorQueue, GatherFutures
5658
from fixlib.types import Json
57-
from fixlib.json import value_in_path
58-
5959
from .utils import global_region_by_partition
6060

6161
log = logging.getLogger("fix.plugins.aws")
@@ -193,6 +193,7 @@ def get_last_run() -> Optional[datetime]:
193193
self.cloud,
194194
self.account,
195195
self.global_region,
196+
{r.id: r for r in self.regions},
196197
self.client,
197198
shared_queue,
198199
self.core_feedback,
@@ -218,6 +219,16 @@ def get_last_run() -> Optional[datetime]:
218219
self.collect_resources(regional_resources, global_builder.for_region(region))
219220
shared_queue.wait_for_submitted_work()
220221

222+
if global_builder.config.collect_usage_metrics:
223+
try:
224+
log.info(f"[Aws:{self.account.id}] Collect usage metrics.")
225+
self.collect_usage_metrics(global_builder)
226+
except Exception as e:
227+
log.warning(
228+
f"Failed to collect usage metrics on account {self.account.id} in region {global_builder.region.id}: {e}"
229+
)
230+
shared_queue.wait_for_submitted_work()
231+
221232
# connect nodes
222233
log.info(f"[Aws:{self.account.id}] Connect resources and create edges.")
223234
for node, data in list(self.graph.nodes(data=True)):
@@ -281,6 +292,40 @@ def work_done(_: Future[None]) -> None:
281292
when_done.add_done_callback(work_done)
282293
return when_done
283294

295+
def collect_usage_metrics(self, builder: GraphBuilder) -> None:
296+
metrics_queries = defaultdict(list)
297+
two_hours = timedelta(hours=2)
298+
thirty_minutes = timedelta(minutes=30)
299+
lookup_map = {}
300+
for resource in builder.graph.nodes:
301+
if not isinstance(resource, AwsResource):
302+
continue
303+
# region can be overridden in the query: s3 is global, but need to be queried per region
304+
if region := cast(AwsRegion, resource.region()):
305+
lookup_map[resource.id] = resource
306+
resource_queries: List[cloudwatch.AwsCloudwatchQuery] = resource.collect_usage_metrics(builder)
307+
for query in resource_queries:
308+
query_region = query.region or region
309+
start = query.start_delta or builder.metrics_delta
310+
if query.period and query.period < thirty_minutes:
311+
start = min(start, two_hours)
312+
metrics_queries[(query_region, start)].append(query)
313+
for (region, start), queries in metrics_queries.items():
314+
315+
def collect_and_set_metrics(
316+
start: timedelta, region: AwsRegion, queries: List[cloudwatch.AwsCloudwatchQuery]
317+
) -> None:
318+
start_at = builder.created_at - start
319+
try:
320+
result = cloudwatch.AwsCloudwatchMetricData.query_for_multiple(
321+
builder.for_region(region), start_at, builder.created_at, queries
322+
)
323+
cloudwatch.update_resource_metrics(lookup_map, result)
324+
except Exception as e:
325+
log.warning(f"Error occurred in region {region}: {e}")
326+
327+
builder.submit_work("cloudwatch", collect_and_set_metrics, start, region, queries)
328+
284329
# TODO: move into separate AwsAccountSettings
285330
def update_account(self) -> None:
286331
log.info(f"Collecting AWS IAM Account Summary in account {self.account.dname}")

plugins/aws/fix_plugin_aws/resource/apigateway.py

Lines changed: 42 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -513,64 +513,56 @@ def called_mutator_apis(cls) -> List[AwsApiSpec]:
513513
]
514514

515515
@classmethod
516-
def collect(cls: Type[AwsResource], json: List[Json], builder: GraphBuilder) -> List[AwsResource]:
517-
instances: List[AwsResource] = []
516+
def collect(cls: Type[AwsResource], json: List[Json], builder: GraphBuilder) -> None:
517+
def add_instance(api_instance: AwsResource) -> None:
518+
for deployment in builder.client.list(service_name, "get-deployments", "items", restApiId=api_instance.id):
519+
if deploy_instance := AwsApiGatewayDeployment.from_api(deployment, builder):
520+
deploy_instance.set_arn(
521+
builder=builder,
522+
account="",
523+
resource=f"/restapis/{api_instance.id}/deployments/{deploy_instance.id}",
524+
)
525+
deploy_instance.api_link = api_instance.id
526+
builder.add_node(deploy_instance, deployment)
527+
builder.add_edge(api_instance, EdgeType.default, node=deploy_instance)
528+
for stage in builder.client.list(
529+
service_name,
530+
"get-stages",
531+
"item",
532+
restApiId=api_instance.id,
533+
deploymentId=deploy_instance.id,
534+
):
535+
stage["syntheticId"] = f'{api_instance.id}_{stage["stageName"]}' # create unique id
536+
if stage_instance := AwsApiGatewayStage.from_api(stage, builder):
537+
stage_instance.api_link = api_instance.id
538+
builder.add_node(stage_instance, stage)
539+
# reference kinds for this edge are maintained in AwsApiGatewayDeployment.reference_kinds # noqa: E501
540+
builder.add_edge(deploy_instance, EdgeType.default, node=stage_instance)
541+
for authorizer in builder.client.list(service_name, "get-authorizers", "items", restApiId=api_instance.id):
542+
if auth_instance := AwsApiGatewayAuthorizer.from_api(authorizer, builder):
543+
auth_instance.api_link = api_instance.id
544+
builder.add_node(auth_instance, authorizer)
545+
builder.add_edge(api_instance, EdgeType.default, node=auth_instance)
546+
for resource in builder.client.list(service_name, "get-resources", "items", restApiId=api_instance.id):
547+
if resource_instance := AwsApiGatewayResource.from_api(resource, builder):
548+
resource_instance.api_link = api_instance.id
549+
if resource_instance.resource_methods:
550+
for method in resource_instance.resource_methods:
551+
mapped = bend(AwsApiGatewayMethod.mapping, resource["resourceMethods"][method])
552+
if gm := parse_json(mapped, AwsApiGatewayMethod, builder):
553+
resource_instance.resource_methods[method] = gm
554+
builder.add_node(resource_instance, resource)
555+
builder.add_edge(api_instance, EdgeType.default, node=resource_instance)
556+
518557
for js in json:
519558
if api_instance := cls.from_api(js, builder):
520559
api_instance.set_arn(
521560
builder=builder,
522561
account="",
523562
resource=f"/restapis/{api_instance.id}",
524563
)
525-
instances.append(api_instance)
526564
builder.add_node(api_instance, js)
527-
for deployment in builder.client.list(
528-
service_name, "get-deployments", "items", restApiId=api_instance.id
529-
):
530-
if deploy_instance := AwsApiGatewayDeployment.from_api(deployment, builder):
531-
deploy_instance.set_arn(
532-
builder=builder,
533-
account="",
534-
resource=f"/restapis/{api_instance.id}/deployments/{deploy_instance.id}",
535-
)
536-
deploy_instance.api_link = api_instance.id
537-
instances.append(deploy_instance)
538-
builder.add_node(deploy_instance, deployment)
539-
builder.add_edge(api_instance, EdgeType.default, node=deploy_instance)
540-
for stage in builder.client.list(
541-
service_name,
542-
"get-stages",
543-
"item",
544-
restApiId=api_instance.id,
545-
deploymentId=deploy_instance.id,
546-
):
547-
stage["syntheticId"] = f'{api_instance.id}_{stage["stageName"]}' # create unique id
548-
if stage_instance := AwsApiGatewayStage.from_api(stage, builder):
549-
stage_instance.api_link = api_instance.id
550-
instances.append(stage_instance)
551-
builder.add_node(stage_instance, stage)
552-
# reference kinds for this edge are maintained in AwsApiGatewayDeployment.reference_kinds # noqa: E501
553-
builder.add_edge(deploy_instance, EdgeType.default, node=stage_instance)
554-
for authorizer in builder.client.list(
555-
service_name, "get-authorizers", "items", restApiId=api_instance.id
556-
):
557-
if auth_instance := AwsApiGatewayAuthorizer.from_api(authorizer, builder):
558-
auth_instance.api_link = api_instance.id
559-
instances.append(auth_instance)
560-
builder.add_node(auth_instance, authorizer)
561-
builder.add_edge(api_instance, EdgeType.default, node=auth_instance)
562-
for resource in builder.client.list(service_name, "get-resources", "items", restApiId=api_instance.id):
563-
if resource_instance := AwsApiGatewayResource.from_api(resource, builder):
564-
resource_instance.api_link = api_instance.id
565-
instances.append(resource_instance)
566-
if resource_instance.resource_methods:
567-
for method in resource_instance.resource_methods:
568-
mapped = bend(AwsApiGatewayMethod.mapping, resource["resourceMethods"][method])
569-
if gm := parse_json(mapped, AwsApiGatewayMethod, builder):
570-
resource_instance.resource_methods[method] = gm
571-
builder.add_node(resource_instance, resource)
572-
builder.add_edge(api_instance, EdgeType.default, node=resource_instance)
573-
return instances
565+
builder.submit_work(service_name, add_instance, api_instance)
574566

575567
def connect_in_graph(self, builder: GraphBuilder, source: Json) -> None:
576568
if self.api_endpoint_configuration:

plugins/aws/fix_plugin_aws/resource/athena.py

Lines changed: 9 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from typing import ClassVar, Dict, Optional, List, Any
22
from typing import Type
33

4+
45
from attrs import define
56

67
from fix_plugin_aws.aws_client import AwsClient
@@ -133,27 +134,18 @@ def called_mutator_apis(cls) -> List[AwsApiSpec]:
133134
]
134135

135136
@classmethod
136-
def collect(cls: Type[AwsResource], json: List[Json], builder: GraphBuilder) -> List[AwsResource]:
137-
workgroups: List[AwsResource] = []
138-
139-
def fetch_workgroup(name: str) -> Optional[AwsAthenaWorkGroup]:
137+
def collect(cls: Type[AwsResource], json: List[Json], builder: GraphBuilder) -> None:
138+
def fetch_workgroup(name: str) -> None:
140139
result = builder.client.get(
141140
aws_service=service_name, action="get-work-group", result_name="WorkGroup", WorkGroup=name
142141
)
143-
if result is None:
144-
return None
145-
146-
if workgroup := AwsAthenaWorkGroup.from_api(result, builder):
142+
if result and (workgroup := AwsAthenaWorkGroup.from_api(result, builder)):
147143
workgroup.set_arn(
148144
builder=builder,
149145
resource=f"workgroup/{workgroup.name}",
150146
)
151-
workgroups.append(workgroup)
152147
builder.add_node(workgroup, result)
153148
builder.submit_work(service_name, add_tags, workgroup)
154-
return workgroup
155-
else:
156-
return None
157149

158150
def add_tags(data_catalog: AwsAthenaWorkGroup) -> None:
159151
tags = builder.client.list(
@@ -167,8 +159,7 @@ def add_tags(data_catalog: AwsAthenaWorkGroup) -> None:
167159

168160
for js in json:
169161
if (name := js.get("Name")) is not None and isinstance(name, str):
170-
fetch_workgroup(name)
171-
return workgroups
162+
builder.submit_work(service_name, fetch_workgroup, name)
172163

173164
# noinspection PyUnboundLocalVariable
174165
def connect_in_graph(self, builder: GraphBuilder, source: Json) -> None:
@@ -252,25 +243,18 @@ def called_mutator_apis(cls) -> List[AwsApiSpec]:
252243
]
253244

254245
@classmethod
255-
def collect(cls: Type[AwsResource], json: List[Json], builder: GraphBuilder) -> List[AwsResource]:
256-
catalogs: List[AwsResource] = []
257-
258-
def fetch_data_catalog(data_catalog_name: str) -> Optional[AwsAthenaDataCatalog]:
246+
def collect(cls: Type[AwsResource], json: List[Json], builder: GraphBuilder) -> None:
247+
def fetch_data_catalog(data_catalog_name: str) -> None:
259248
result = builder.client.get(
260249
aws_service=service_name,
261250
action="get-data-catalog",
262251
result_name="DataCatalog",
263252
Name=data_catalog_name,
264253
)
265-
if result is None:
266-
return None
267-
if catalog := AwsAthenaDataCatalog.from_api(result, builder):
254+
if result and (catalog := AwsAthenaDataCatalog.from_api(result, builder)):
268255
catalog.set_arn(builder=builder, resource=f"datacatalog/{catalog.name}")
269-
catalogs.append(catalog)
270256
builder.add_node(catalog, result)
271257
builder.submit_work(service_name, add_tags, catalog)
272-
return catalog
273-
return None
274258

275259
def add_tags(data_catalog: AwsAthenaDataCatalog) -> None:
276260
tags = builder.client.list(
@@ -285,8 +269,7 @@ def add_tags(data_catalog: AwsAthenaDataCatalog) -> None:
285269
for js in json:
286270
# we filter out the default data catalog as it is not possible to do much with it
287271
if (name := js.get("CatalogName")) is not None and isinstance(name, str) and name != "AwsDataCatalog":
288-
fetch_data_catalog(name)
289-
return catalogs
272+
builder.submit_work(service_name, fetch_data_catalog, name)
290273

291274
def update_resource_tag(self, client: AwsClient, key: str, value: str) -> bool:
292275
client.call(

0 commit comments

Comments
 (0)