Skip to content

Commit ff760a0

Browse files
authored
Revert "[feat][aws] Add more resources to done in parallel (#2066)" (#2070)
1 parent 718fa3e commit ff760a0

File tree

21 files changed

+325
-573
lines changed

21 files changed

+325
-573
lines changed

plugins/aws/fix_plugin_aws/resource/apigateway.py

Lines changed: 50 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
from datetime import datetime
22
from typing import ClassVar, Dict, Optional, List, Type, Union, Any
3-
from concurrent.futures import wait as futures_wait
3+
44
from attrs import define, field
55
from fix_plugin_aws.aws_client import AwsClient
66

@@ -514,69 +514,63 @@ def called_mutator_apis(cls) -> List[AwsApiSpec]:
514514

515515
@classmethod
516516
def collect(cls: Type[AwsResource], json: List[Json], builder: GraphBuilder) -> List[AwsResource]:
517-
api_instances: List[AwsResource] = []
518-
519-
def add_instance(api_instance: AwsResource) -> List[AwsResource]:
520-
instances: List[AwsResource] = []
521-
for deployment in builder.client.list(service_name, "get-deployments", "items", restApiId=api_instance.id):
522-
if deploy_instance := AwsApiGatewayDeployment.from_api(deployment, builder):
523-
deploy_instance.set_arn(
524-
builder=builder,
525-
account="",
526-
resource=f"/restapis/{api_instance.id}/deployments/{deploy_instance.id}",
527-
)
528-
deploy_instance.api_link = api_instance.id
529-
instances.append(deploy_instance)
530-
builder.add_node(deploy_instance, deployment)
531-
builder.add_edge(api_instance, EdgeType.default, node=deploy_instance)
532-
for stage in builder.client.list(
533-
service_name,
534-
"get-stages",
535-
"item",
536-
restApiId=api_instance.id,
537-
deploymentId=deploy_instance.id,
538-
):
539-
stage["syntheticId"] = f'{api_instance.id}_{stage["stageName"]}' # create unique id
540-
if stage_instance := AwsApiGatewayStage.from_api(stage, builder):
541-
stage_instance.api_link = api_instance.id
542-
instances.append(stage_instance)
543-
builder.add_node(stage_instance, stage)
544-
# reference kinds for this edge are maintained in AwsApiGatewayDeployment.reference_kinds # noqa: E501
545-
builder.add_edge(deploy_instance, EdgeType.default, node=stage_instance)
546-
for authorizer in builder.client.list(service_name, "get-authorizers", "items", restApiId=api_instance.id):
547-
if auth_instance := AwsApiGatewayAuthorizer.from_api(authorizer, builder):
548-
auth_instance.api_link = api_instance.id
549-
instances.append(auth_instance)
550-
builder.add_node(auth_instance, authorizer)
551-
builder.add_edge(api_instance, EdgeType.default, node=auth_instance)
552-
for resource in builder.client.list(service_name, "get-resources", "items", restApiId=api_instance.id):
553-
if resource_instance := AwsApiGatewayResource.from_api(resource, builder):
554-
resource_instance.api_link = api_instance.id
555-
instances.append(resource_instance)
556-
if resource_instance.resource_methods:
557-
for method in resource_instance.resource_methods:
558-
mapped = bend(AwsApiGatewayMethod.mapping, resource["resourceMethods"][method])
559-
if gm := parse_json(mapped, AwsApiGatewayMethod, builder):
560-
resource_instance.resource_methods[method] = gm
561-
builder.add_node(resource_instance, resource)
562-
builder.add_edge(api_instance, EdgeType.default, node=resource_instance)
563-
return instances
564-
565-
futures = []
517+
instances: List[AwsResource] = []
566518
for js in json:
567519
if api_instance := cls.from_api(js, builder):
568520
api_instance.set_arn(
569521
builder=builder,
570522
account="",
571523
resource=f"/restapis/{api_instance.id}",
572524
)
573-
api_instances.append(api_instance)
525+
instances.append(api_instance)
574526
builder.add_node(api_instance, js)
575-
future = builder.submit_work(service_name, add_instance, api_instance)
576-
futures.append(future)
577-
futures_wait(futures)
578-
instances: List[AwsResource] = [result for future in futures for result in future.result()]
579-
return api_instances + instances
527+
for deployment in builder.client.list(
528+
service_name, "get-deployments", "items", restApiId=api_instance.id
529+
):
530+
if deploy_instance := AwsApiGatewayDeployment.from_api(deployment, builder):
531+
deploy_instance.set_arn(
532+
builder=builder,
533+
account="",
534+
resource=f"/restapis/{api_instance.id}/deployments/{deploy_instance.id}",
535+
)
536+
deploy_instance.api_link = api_instance.id
537+
instances.append(deploy_instance)
538+
builder.add_node(deploy_instance, deployment)
539+
builder.add_edge(api_instance, EdgeType.default, node=deploy_instance)
540+
for stage in builder.client.list(
541+
service_name,
542+
"get-stages",
543+
"item",
544+
restApiId=api_instance.id,
545+
deploymentId=deploy_instance.id,
546+
):
547+
stage["syntheticId"] = f'{api_instance.id}_{stage["stageName"]}' # create unique id
548+
if stage_instance := AwsApiGatewayStage.from_api(stage, builder):
549+
stage_instance.api_link = api_instance.id
550+
instances.append(stage_instance)
551+
builder.add_node(stage_instance, stage)
552+
# reference kinds for this edge are maintained in AwsApiGatewayDeployment.reference_kinds # noqa: E501
553+
builder.add_edge(deploy_instance, EdgeType.default, node=stage_instance)
554+
for authorizer in builder.client.list(
555+
service_name, "get-authorizers", "items", restApiId=api_instance.id
556+
):
557+
if auth_instance := AwsApiGatewayAuthorizer.from_api(authorizer, builder):
558+
auth_instance.api_link = api_instance.id
559+
instances.append(auth_instance)
560+
builder.add_node(auth_instance, authorizer)
561+
builder.add_edge(api_instance, EdgeType.default, node=auth_instance)
562+
for resource in builder.client.list(service_name, "get-resources", "items", restApiId=api_instance.id):
563+
if resource_instance := AwsApiGatewayResource.from_api(resource, builder):
564+
resource_instance.api_link = api_instance.id
565+
instances.append(resource_instance)
566+
if resource_instance.resource_methods:
567+
for method in resource_instance.resource_methods:
568+
mapped = bend(AwsApiGatewayMethod.mapping, resource["resourceMethods"][method])
569+
if gm := parse_json(mapped, AwsApiGatewayMethod, builder):
570+
resource_instance.resource_methods[method] = gm
571+
builder.add_node(resource_instance, resource)
572+
builder.add_edge(api_instance, EdgeType.default, node=resource_instance)
573+
return instances
580574

581575
def connect_in_graph(self, builder: GraphBuilder, source: Json) -> None:
582576
if self.api_endpoint_configuration:

plugins/aws/fix_plugin_aws/resource/athena.py

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
from typing import ClassVar, Dict, Optional, List, Any
22
from typing import Type
3-
from concurrent.futures import wait as futures_wait
43

54
from attrs import define
65

@@ -135,6 +134,8 @@ def called_mutator_apis(cls) -> List[AwsApiSpec]:
135134

136135
@classmethod
137136
def collect(cls: Type[AwsResource], json: List[Json], builder: GraphBuilder) -> List[AwsResource]:
137+
workgroups: List[AwsResource] = []
138+
138139
def fetch_workgroup(name: str) -> Optional[AwsAthenaWorkGroup]:
139140
result = builder.client.get(
140141
aws_service=service_name, action="get-work-group", result_name="WorkGroup", WorkGroup=name
@@ -147,6 +148,7 @@ def fetch_workgroup(name: str) -> Optional[AwsAthenaWorkGroup]:
147148
builder=builder,
148149
resource=f"workgroup/{workgroup.name}",
149150
)
151+
workgroups.append(workgroup)
150152
builder.add_node(workgroup, result)
151153
builder.submit_work(service_name, add_tags, workgroup)
152154
return workgroup
@@ -163,13 +165,9 @@ def add_tags(data_catalog: AwsAthenaWorkGroup) -> None:
163165
if tags:
164166
data_catalog.tags = bend(ToDict(), tags)
165167

166-
futures = []
167168
for js in json:
168169
if (name := js.get("Name")) is not None and isinstance(name, str):
169-
future = builder.submit_work(service_name, fetch_workgroup, name)
170-
futures.append(future)
171-
futures_wait(futures)
172-
workgroups: List[AwsResource] = [result for future in futures if (result := future.result())]
170+
fetch_workgroup(name)
173171
return workgroups
174172

175173
# noinspection PyUnboundLocalVariable
@@ -255,6 +253,8 @@ def called_mutator_apis(cls) -> List[AwsApiSpec]:
255253

256254
@classmethod
257255
def collect(cls: Type[AwsResource], json: List[Json], builder: GraphBuilder) -> List[AwsResource]:
256+
catalogs: List[AwsResource] = []
257+
258258
def fetch_data_catalog(data_catalog_name: str) -> Optional[AwsAthenaDataCatalog]:
259259
result = builder.client.get(
260260
aws_service=service_name,
@@ -266,6 +266,7 @@ def fetch_data_catalog(data_catalog_name: str) -> Optional[AwsAthenaDataCatalog]
266266
return None
267267
if catalog := AwsAthenaDataCatalog.from_api(result, builder):
268268
catalog.set_arn(builder=builder, resource=f"datacatalog/{catalog.name}")
269+
catalogs.append(catalog)
269270
builder.add_node(catalog, result)
270271
builder.submit_work(service_name, add_tags, catalog)
271272
return catalog
@@ -281,14 +282,10 @@ def add_tags(data_catalog: AwsAthenaDataCatalog) -> None:
281282
if tags:
282283
data_catalog.tags = bend(S("Tags", default=[]) >> ToDict(), tags[0])
283284

284-
futures = []
285285
for js in json:
286286
# we filter out the default data catalog as it is not possible to do much with it
287287
if (name := js.get("CatalogName")) is not None and isinstance(name, str) and name != "AwsDataCatalog":
288-
future = builder.submit_work(service_name, fetch_data_catalog, name)
289-
futures.append(future)
290-
futures_wait(futures)
291-
catalogs: List[AwsResource] = [result for future in futures if (result := future.result())]
288+
fetch_data_catalog(name)
292289
return catalogs
293290

294291
def update_resource_tag(self, client: AwsClient, key: str, value: str) -> bool:

plugins/aws/fix_plugin_aws/resource/cloudtrail.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -257,7 +257,7 @@ def collect_tags(trail: AwsCloudTrail) -> None:
257257
builder.add_deferred_edge(
258258
builder.region, EdgeType.default, f'is(aws_cloud_trail) and reported.arn=="{arn}"'
259259
)
260-
futures_wait(futures)
260+
futures_wait(futures) # only continue, when all task definitions are collected
261261
instances: List[AwsResource] = [result for future in futures if (result := future.result())]
262262
return instances
263263

plugins/aws/fix_plugin_aws/resource/cognito.py

Lines changed: 15 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
from attrs import define, field
22
from typing import ClassVar, Dict, List, Optional, Type, Tuple, Any
3-
from concurrent.futures import wait as futures_wait
4-
53
from fix_plugin_aws.aws_client import AwsClient
64
from fix_plugin_aws.resource.base import AwsApiSpec, AwsResource, GraphBuilder
75
from fix_plugin_aws.resource.iam import AwsIamRole
@@ -245,41 +243,32 @@ def called_mutator_apis(cls) -> List[AwsApiSpec]:
245243

246244
@classmethod
247245
def collect(cls: Type[AwsResource], json: List[Json], builder: GraphBuilder) -> List[AwsResource]:
248-
pool_instances: List[AwsResource] = []
246+
instances: List[AwsResource] = []
249247

250248
def add_tags(pool: AwsCognitoUserPool) -> None:
251249
tags = builder.client.get(service_name, "list-tags-for-resource", "Tags", ResourceArn=pool.arn)
252250
if tags:
253251
pool.tags = tags
254252

255-
def fetch_additional_instances(pool_instance: AwsCognitoUser) -> List[AwsResource]:
256-
instances: List[AwsResource] = []
257-
for user in builder.client.list(service_name, "list-users", "Users", UserPoolId=pool_instance.id):
258-
if user_instance := AwsCognitoUser.from_api(user, builder):
259-
user_instance.pool_name = pool_instance.name
260-
user_instance._pool_id = pool_instance.id
261-
instances.append(user_instance)
262-
builder.add_node(user_instance, user)
263-
builder.add_edge(from_node=pool_instance, edge_type=EdgeType.default, node=user_instance)
264-
for group in builder.client.list(service_name, "list-groups", "Groups", UserPoolId=pool_instance.id):
265-
if group_instance := AwsCognitoGroup.from_api(group, builder):
266-
instances.append(group_instance)
267-
builder.add_node(group_instance, group)
268-
builder.add_edge(from_node=pool_instance, edge_type=EdgeType.default, node=group_instance)
269-
return instances
270-
271-
futures = []
272253
for pool in json:
273254
if pool_instance := cls.from_api(pool, builder):
274255
pool_instance.set_arn(builder=builder, resource=f"userpool/{pool_instance.id}")
275-
pool_instances.append(pool_instance)
256+
instances.append(pool_instance)
276257
builder.add_node(pool_instance, pool)
277258
builder.submit_work(service_name, add_tags, pool_instance)
278-
future = builder.submit_work(service_name, fetch_additional_instances, pool_instance)
279-
futures.append(future)
280-
futures_wait(futures)
281-
user_and_groups: List[AwsResource] = [result for future in futures for result in future.result()]
282-
return pool_instances + user_and_groups
259+
for user in builder.client.list(service_name, "list-users", "Users", UserPoolId=pool_instance.id):
260+
if user_instance := AwsCognitoUser.from_api(user, builder):
261+
user_instance.pool_name = pool_instance.name
262+
user_instance._pool_id = pool_instance.id
263+
instances.append(user_instance)
264+
builder.add_node(user_instance, user)
265+
builder.add_edge(from_node=pool_instance, edge_type=EdgeType.default, node=user_instance)
266+
for group in builder.client.list(service_name, "list-groups", "Groups", UserPoolId=pool_instance.id):
267+
if group_instance := AwsCognitoGroup.from_api(group, builder):
268+
instances.append(group_instance)
269+
builder.add_node(group_instance, group)
270+
builder.add_edge(from_node=pool_instance, edge_type=EdgeType.default, node=group_instance)
271+
return instances
283272

284273
def connect_in_graph(self, builder: GraphBuilder, source: Json) -> None:
285274
if self.lambda_config:

plugins/aws/fix_plugin_aws/resource/dynamodb.py

Lines changed: 10 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
from datetime import datetime
22
from typing import ClassVar, Dict, List, Optional, Type, Any
33
from attrs import define, field
4-
from concurrent.futures import Future, wait as futures_wait
54
from fix_plugin_aws.aws_client import AwsClient
65
from fix_plugin_aws.resource.base import AwsApiSpec, AwsResource, GraphBuilder
76
from fix_plugin_aws.resource.kinesis import AwsKinesisStream
@@ -402,27 +401,24 @@ def called_collect_apis(cls) -> List[AwsApiSpec]:
402401

403402
@classmethod
404403
def collect(cls: Type[AwsResource], json: List[Json], builder: GraphBuilder) -> List[AwsResource]:
405-
def add_instance(table: str) -> Optional[AwsResource]:
404+
instances = []
405+
406+
def add_instance(table: str) -> None:
406407
table_description = builder.client.get(service_name, "describe-table", "Table", TableName=table)
407408
if table_description is not None:
408409
if instance := cls.from_api(table_description, builder):
410+
instances.append(instance)
409411
builder.add_node(instance, table_description)
410412
builder.submit_work(service_name, add_tags, instance)
411-
return instance
412-
return None
413413

414414
def add_tags(table: AwsDynamoDbTable) -> None:
415415
tags = builder.client.list(service_name, "list-tags-of-resource", "Tags", ResourceArn=table.arn)
416416
if tags:
417417
table.tags = bend(ToDict(), tags)
418418

419-
futures: List[Future[Optional[AwsResource]]] = []
420419
for js in json:
421420
if isinstance(js, str):
422-
future = builder.submit_work(service_name, add_instance, js)
423-
futures.append(future)
424-
futures_wait(futures)
425-
instances: List[AwsResource] = [result for future in futures if (result := future.result())]
421+
add_instance(js)
426422
return instances
427423

428424
def connect_in_graph(self, builder: GraphBuilder, source: Json) -> None:
@@ -493,7 +489,9 @@ def called_collect_apis(cls) -> List[AwsApiSpec]:
493489

494490
@classmethod
495491
def collect(cls: Type[AwsResource], json: List[Json], builder: GraphBuilder) -> List[AwsResource]:
496-
def add_instance(table: Dict[str, str]) -> Optional[AwsResource]:
492+
instances = []
493+
494+
def add_instance(table: Dict[str, str]) -> None:
497495
table_description = builder.client.get(
498496
service_name,
499497
"describe-global-table",
@@ -502,22 +500,17 @@ def add_instance(table: Dict[str, str]) -> Optional[AwsResource]:
502500
)
503501
if table_description:
504502
if instance := cls.from_api(table_description, builder):
503+
instances.append(instance)
505504
builder.add_node(instance, table_description)
506505
builder.submit_work(service_name, add_tags, instance)
507-
return instance
508-
return None
509506

510507
def add_tags(table: AwsDynamoDbGlobalTable) -> None:
511508
tags = builder.client.list(service_name, "list-tags-of-resource", "Tags", ResourceArn=table.arn)
512509
if tags:
513510
table.tags = bend(ToDict(), tags)
514511

515-
futures = []
516512
for js in json:
517-
future = builder.submit_work(service_name, add_instance, js)
518-
futures.append(future)
519-
futures_wait(futures)
520-
instances: List[AwsResource] = [result for future in futures if (result := future.result())]
513+
add_instance(js)
521514
return instances
522515

523516
def connect_in_graph(self, builder: GraphBuilder, source: Json) -> None:

0 commit comments

Comments
 (0)