Skip to content

Commit 718fa3e

Browse files
1101-1aquamatthias
andauthored
[feat][aws] Add more resources to done in parallel (#2066)
Co-authored-by: Matthias Veit <matthias_veit@yahoo.de>
1 parent 3355f87 commit 718fa3e

File tree

21 files changed

+573
-325
lines changed

21 files changed

+573
-325
lines changed

plugins/aws/fix_plugin_aws/resource/apigateway.py

Lines changed: 56 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
from datetime import datetime
22
from typing import ClassVar, Dict, Optional, List, Type, Union, Any
3-
3+
from concurrent.futures import wait as futures_wait
44
from attrs import define, field
55
from fix_plugin_aws.aws_client import AwsClient
66

@@ -514,63 +514,69 @@ def called_mutator_apis(cls) -> List[AwsApiSpec]:
514514

515515
@classmethod
516516
def collect(cls: Type[AwsResource], json: List[Json], builder: GraphBuilder) -> List[AwsResource]:
517-
instances: List[AwsResource] = []
517+
api_instances: List[AwsResource] = []
518+
519+
def add_instance(api_instance: AwsResource) -> List[AwsResource]:
520+
instances: List[AwsResource] = []
521+
for deployment in builder.client.list(service_name, "get-deployments", "items", restApiId=api_instance.id):
522+
if deploy_instance := AwsApiGatewayDeployment.from_api(deployment, builder):
523+
deploy_instance.set_arn(
524+
builder=builder,
525+
account="",
526+
resource=f"/restapis/{api_instance.id}/deployments/{deploy_instance.id}",
527+
)
528+
deploy_instance.api_link = api_instance.id
529+
instances.append(deploy_instance)
530+
builder.add_node(deploy_instance, deployment)
531+
builder.add_edge(api_instance, EdgeType.default, node=deploy_instance)
532+
for stage in builder.client.list(
533+
service_name,
534+
"get-stages",
535+
"item",
536+
restApiId=api_instance.id,
537+
deploymentId=deploy_instance.id,
538+
):
539+
stage["syntheticId"] = f'{api_instance.id}_{stage["stageName"]}' # create unique id
540+
if stage_instance := AwsApiGatewayStage.from_api(stage, builder):
541+
stage_instance.api_link = api_instance.id
542+
instances.append(stage_instance)
543+
builder.add_node(stage_instance, stage)
544+
# reference kinds for this edge are maintained in AwsApiGatewayDeployment.reference_kinds # noqa: E501
545+
builder.add_edge(deploy_instance, EdgeType.default, node=stage_instance)
546+
for authorizer in builder.client.list(service_name, "get-authorizers", "items", restApiId=api_instance.id):
547+
if auth_instance := AwsApiGatewayAuthorizer.from_api(authorizer, builder):
548+
auth_instance.api_link = api_instance.id
549+
instances.append(auth_instance)
550+
builder.add_node(auth_instance, authorizer)
551+
builder.add_edge(api_instance, EdgeType.default, node=auth_instance)
552+
for resource in builder.client.list(service_name, "get-resources", "items", restApiId=api_instance.id):
553+
if resource_instance := AwsApiGatewayResource.from_api(resource, builder):
554+
resource_instance.api_link = api_instance.id
555+
instances.append(resource_instance)
556+
if resource_instance.resource_methods:
557+
for method in resource_instance.resource_methods:
558+
mapped = bend(AwsApiGatewayMethod.mapping, resource["resourceMethods"][method])
559+
if gm := parse_json(mapped, AwsApiGatewayMethod, builder):
560+
resource_instance.resource_methods[method] = gm
561+
builder.add_node(resource_instance, resource)
562+
builder.add_edge(api_instance, EdgeType.default, node=resource_instance)
563+
return instances
564+
565+
futures = []
518566
for js in json:
519567
if api_instance := cls.from_api(js, builder):
520568
api_instance.set_arn(
521569
builder=builder,
522570
account="",
523571
resource=f"/restapis/{api_instance.id}",
524572
)
525-
instances.append(api_instance)
573+
api_instances.append(api_instance)
526574
builder.add_node(api_instance, js)
527-
for deployment in builder.client.list(
528-
service_name, "get-deployments", "items", restApiId=api_instance.id
529-
):
530-
if deploy_instance := AwsApiGatewayDeployment.from_api(deployment, builder):
531-
deploy_instance.set_arn(
532-
builder=builder,
533-
account="",
534-
resource=f"/restapis/{api_instance.id}/deployments/{deploy_instance.id}",
535-
)
536-
deploy_instance.api_link = api_instance.id
537-
instances.append(deploy_instance)
538-
builder.add_node(deploy_instance, deployment)
539-
builder.add_edge(api_instance, EdgeType.default, node=deploy_instance)
540-
for stage in builder.client.list(
541-
service_name,
542-
"get-stages",
543-
"item",
544-
restApiId=api_instance.id,
545-
deploymentId=deploy_instance.id,
546-
):
547-
stage["syntheticId"] = f'{api_instance.id}_{stage["stageName"]}' # create unique id
548-
if stage_instance := AwsApiGatewayStage.from_api(stage, builder):
549-
stage_instance.api_link = api_instance.id
550-
instances.append(stage_instance)
551-
builder.add_node(stage_instance, stage)
552-
# reference kinds for this edge are maintained in AwsApiGatewayDeployment.reference_kinds # noqa: E501
553-
builder.add_edge(deploy_instance, EdgeType.default, node=stage_instance)
554-
for authorizer in builder.client.list(
555-
service_name, "get-authorizers", "items", restApiId=api_instance.id
556-
):
557-
if auth_instance := AwsApiGatewayAuthorizer.from_api(authorizer, builder):
558-
auth_instance.api_link = api_instance.id
559-
instances.append(auth_instance)
560-
builder.add_node(auth_instance, authorizer)
561-
builder.add_edge(api_instance, EdgeType.default, node=auth_instance)
562-
for resource in builder.client.list(service_name, "get-resources", "items", restApiId=api_instance.id):
563-
if resource_instance := AwsApiGatewayResource.from_api(resource, builder):
564-
resource_instance.api_link = api_instance.id
565-
instances.append(resource_instance)
566-
if resource_instance.resource_methods:
567-
for method in resource_instance.resource_methods:
568-
mapped = bend(AwsApiGatewayMethod.mapping, resource["resourceMethods"][method])
569-
if gm := parse_json(mapped, AwsApiGatewayMethod, builder):
570-
resource_instance.resource_methods[method] = gm
571-
builder.add_node(resource_instance, resource)
572-
builder.add_edge(api_instance, EdgeType.default, node=resource_instance)
573-
return instances
575+
future = builder.submit_work(service_name, add_instance, api_instance)
576+
futures.append(future)
577+
futures_wait(futures)
578+
instances: List[AwsResource] = [result for future in futures for result in future.result()]
579+
return api_instances + instances
574580

575581
def connect_in_graph(self, builder: GraphBuilder, source: Json) -> None:
576582
if self.api_endpoint_configuration:

plugins/aws/fix_plugin_aws/resource/athena.py

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from typing import ClassVar, Dict, Optional, List, Any
22
from typing import Type
3+
from concurrent.futures import wait as futures_wait
34

45
from attrs import define
56

@@ -134,8 +135,6 @@ def called_mutator_apis(cls) -> List[AwsApiSpec]:
134135

135136
@classmethod
136137
def collect(cls: Type[AwsResource], json: List[Json], builder: GraphBuilder) -> List[AwsResource]:
137-
workgroups: List[AwsResource] = []
138-
139138
def fetch_workgroup(name: str) -> Optional[AwsAthenaWorkGroup]:
140139
result = builder.client.get(
141140
aws_service=service_name, action="get-work-group", result_name="WorkGroup", WorkGroup=name
@@ -148,7 +147,6 @@ def fetch_workgroup(name: str) -> Optional[AwsAthenaWorkGroup]:
148147
builder=builder,
149148
resource=f"workgroup/{workgroup.name}",
150149
)
151-
workgroups.append(workgroup)
152150
builder.add_node(workgroup, result)
153151
builder.submit_work(service_name, add_tags, workgroup)
154152
return workgroup
@@ -165,9 +163,13 @@ def add_tags(data_catalog: AwsAthenaWorkGroup) -> None:
165163
if tags:
166164
data_catalog.tags = bend(ToDict(), tags)
167165

166+
futures = []
168167
for js in json:
169168
if (name := js.get("Name")) is not None and isinstance(name, str):
170-
fetch_workgroup(name)
169+
future = builder.submit_work(service_name, fetch_workgroup, name)
170+
futures.append(future)
171+
futures_wait(futures)
172+
workgroups: List[AwsResource] = [result for future in futures if (result := future.result())]
171173
return workgroups
172174

173175
# noinspection PyUnboundLocalVariable
@@ -253,8 +255,6 @@ def called_mutator_apis(cls) -> List[AwsApiSpec]:
253255

254256
@classmethod
255257
def collect(cls: Type[AwsResource], json: List[Json], builder: GraphBuilder) -> List[AwsResource]:
256-
catalogs: List[AwsResource] = []
257-
258258
def fetch_data_catalog(data_catalog_name: str) -> Optional[AwsAthenaDataCatalog]:
259259
result = builder.client.get(
260260
aws_service=service_name,
@@ -266,7 +266,6 @@ def fetch_data_catalog(data_catalog_name: str) -> Optional[AwsAthenaDataCatalog]
266266
return None
267267
if catalog := AwsAthenaDataCatalog.from_api(result, builder):
268268
catalog.set_arn(builder=builder, resource=f"datacatalog/{catalog.name}")
269-
catalogs.append(catalog)
270269
builder.add_node(catalog, result)
271270
builder.submit_work(service_name, add_tags, catalog)
272271
return catalog
@@ -282,10 +281,14 @@ def add_tags(data_catalog: AwsAthenaDataCatalog) -> None:
282281
if tags:
283282
data_catalog.tags = bend(S("Tags", default=[]) >> ToDict(), tags[0])
284283

284+
futures = []
285285
for js in json:
286286
# we filter out the default data catalog as it is not possible to do much with it
287287
if (name := js.get("CatalogName")) is not None and isinstance(name, str) and name != "AwsDataCatalog":
288-
fetch_data_catalog(name)
288+
future = builder.submit_work(service_name, fetch_data_catalog, name)
289+
futures.append(future)
290+
futures_wait(futures)
291+
catalogs: List[AwsResource] = [result for future in futures if (result := future.result())]
289292
return catalogs
290293

291294
def update_resource_tag(self, client: AwsClient, key: str, value: str) -> bool:

plugins/aws/fix_plugin_aws/resource/cloudtrail.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -257,7 +257,7 @@ def collect_tags(trail: AwsCloudTrail) -> None:
257257
builder.add_deferred_edge(
258258
builder.region, EdgeType.default, f'is(aws_cloud_trail) and reported.arn=="{arn}"'
259259
)
260-
futures_wait(futures) # only continue, when all task definitions are collected
260+
futures_wait(futures)
261261
instances: List[AwsResource] = [result for future in futures if (result := future.result())]
262262
return instances
263263

plugins/aws/fix_plugin_aws/resource/cognito.py

Lines changed: 26 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
from attrs import define, field
22
from typing import ClassVar, Dict, List, Optional, Type, Tuple, Any
3+
from concurrent.futures import wait as futures_wait
4+
35
from fix_plugin_aws.aws_client import AwsClient
46
from fix_plugin_aws.resource.base import AwsApiSpec, AwsResource, GraphBuilder
57
from fix_plugin_aws.resource.iam import AwsIamRole
@@ -243,32 +245,41 @@ def called_mutator_apis(cls) -> List[AwsApiSpec]:
243245

244246
@classmethod
245247
def collect(cls: Type[AwsResource], json: List[Json], builder: GraphBuilder) -> List[AwsResource]:
246-
instances: List[AwsResource] = []
248+
pool_instances: List[AwsResource] = []
247249

248250
def add_tags(pool: AwsCognitoUserPool) -> None:
249251
tags = builder.client.get(service_name, "list-tags-for-resource", "Tags", ResourceArn=pool.arn)
250252
if tags:
251253
pool.tags = tags
252254

255+
def fetch_additional_instances(pool_instance: AwsCognitoUser) -> List[AwsResource]:
256+
instances: List[AwsResource] = []
257+
for user in builder.client.list(service_name, "list-users", "Users", UserPoolId=pool_instance.id):
258+
if user_instance := AwsCognitoUser.from_api(user, builder):
259+
user_instance.pool_name = pool_instance.name
260+
user_instance._pool_id = pool_instance.id
261+
instances.append(user_instance)
262+
builder.add_node(user_instance, user)
263+
builder.add_edge(from_node=pool_instance, edge_type=EdgeType.default, node=user_instance)
264+
for group in builder.client.list(service_name, "list-groups", "Groups", UserPoolId=pool_instance.id):
265+
if group_instance := AwsCognitoGroup.from_api(group, builder):
266+
instances.append(group_instance)
267+
builder.add_node(group_instance, group)
268+
builder.add_edge(from_node=pool_instance, edge_type=EdgeType.default, node=group_instance)
269+
return instances
270+
271+
futures = []
253272
for pool in json:
254273
if pool_instance := cls.from_api(pool, builder):
255274
pool_instance.set_arn(builder=builder, resource=f"userpool/{pool_instance.id}")
256-
instances.append(pool_instance)
275+
pool_instances.append(pool_instance)
257276
builder.add_node(pool_instance, pool)
258277
builder.submit_work(service_name, add_tags, pool_instance)
259-
for user in builder.client.list(service_name, "list-users", "Users", UserPoolId=pool_instance.id):
260-
if user_instance := AwsCognitoUser.from_api(user, builder):
261-
user_instance.pool_name = pool_instance.name
262-
user_instance._pool_id = pool_instance.id
263-
instances.append(user_instance)
264-
builder.add_node(user_instance, user)
265-
builder.add_edge(from_node=pool_instance, edge_type=EdgeType.default, node=user_instance)
266-
for group in builder.client.list(service_name, "list-groups", "Groups", UserPoolId=pool_instance.id):
267-
if group_instance := AwsCognitoGroup.from_api(group, builder):
268-
instances.append(group_instance)
269-
builder.add_node(group_instance, group)
270-
builder.add_edge(from_node=pool_instance, edge_type=EdgeType.default, node=group_instance)
271-
return instances
278+
future = builder.submit_work(service_name, fetch_additional_instances, pool_instance)
279+
futures.append(future)
280+
futures_wait(futures)
281+
user_and_groups: List[AwsResource] = [result for future in futures for result in future.result()]
282+
return pool_instances + user_and_groups
272283

273284
def connect_in_graph(self, builder: GraphBuilder, source: Json) -> None:
274285
if self.lambda_config:

plugins/aws/fix_plugin_aws/resource/dynamodb.py

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from datetime import datetime
22
from typing import ClassVar, Dict, List, Optional, Type, Any
33
from attrs import define, field
4+
from concurrent.futures import Future, wait as futures_wait
45
from fix_plugin_aws.aws_client import AwsClient
56
from fix_plugin_aws.resource.base import AwsApiSpec, AwsResource, GraphBuilder
67
from fix_plugin_aws.resource.kinesis import AwsKinesisStream
@@ -401,24 +402,27 @@ def called_collect_apis(cls) -> List[AwsApiSpec]:
401402

402403
@classmethod
403404
def collect(cls: Type[AwsResource], json: List[Json], builder: GraphBuilder) -> List[AwsResource]:
404-
instances = []
405-
406-
def add_instance(table: str) -> None:
405+
def add_instance(table: str) -> Optional[AwsResource]:
407406
table_description = builder.client.get(service_name, "describe-table", "Table", TableName=table)
408407
if table_description is not None:
409408
if instance := cls.from_api(table_description, builder):
410-
instances.append(instance)
411409
builder.add_node(instance, table_description)
412410
builder.submit_work(service_name, add_tags, instance)
411+
return instance
412+
return None
413413

414414
def add_tags(table: AwsDynamoDbTable) -> None:
415415
tags = builder.client.list(service_name, "list-tags-of-resource", "Tags", ResourceArn=table.arn)
416416
if tags:
417417
table.tags = bend(ToDict(), tags)
418418

419+
futures: List[Future[Optional[AwsResource]]] = []
419420
for js in json:
420421
if isinstance(js, str):
421-
add_instance(js)
422+
future = builder.submit_work(service_name, add_instance, js)
423+
futures.append(future)
424+
futures_wait(futures)
425+
instances: List[AwsResource] = [result for future in futures if (result := future.result())]
422426
return instances
423427

424428
def connect_in_graph(self, builder: GraphBuilder, source: Json) -> None:
@@ -489,9 +493,7 @@ def called_collect_apis(cls) -> List[AwsApiSpec]:
489493

490494
@classmethod
491495
def collect(cls: Type[AwsResource], json: List[Json], builder: GraphBuilder) -> List[AwsResource]:
492-
instances = []
493-
494-
def add_instance(table: Dict[str, str]) -> None:
496+
def add_instance(table: Dict[str, str]) -> Optional[AwsResource]:
495497
table_description = builder.client.get(
496498
service_name,
497499
"describe-global-table",
@@ -500,17 +502,22 @@ def add_instance(table: Dict[str, str]) -> None:
500502
)
501503
if table_description:
502504
if instance := cls.from_api(table_description, builder):
503-
instances.append(instance)
504505
builder.add_node(instance, table_description)
505506
builder.submit_work(service_name, add_tags, instance)
507+
return instance
508+
return None
506509

507510
def add_tags(table: AwsDynamoDbGlobalTable) -> None:
508511
tags = builder.client.list(service_name, "list-tags-of-resource", "Tags", ResourceArn=table.arn)
509512
if tags:
510513
table.tags = bend(ToDict(), tags)
511514

515+
futures = []
512516
for js in json:
513-
add_instance(js)
517+
future = builder.submit_work(service_name, add_instance, js)
518+
futures.append(future)
519+
futures_wait(futures)
520+
instances: List[AwsResource] = [result for future in futures if (result := future.result())]
514521
return instances
515522

516523
def connect_in_graph(self, builder: GraphBuilder, source: Json) -> None:

0 commit comments

Comments
 (0)