Skip to content

Commit e0f8435

Browse files
1101-1aquamatthias
andauthored
[gcp][fix]: Deduplicate error messages in accumulator (#2223)
Co-authored-by: Matthias Veit <matthias_veit@yahoo.de>
1 parent e27feda commit e0f8435

File tree

4 files changed

+59
-27
lines changed

4 files changed

+59
-27
lines changed

plugins/gcp/fix_plugin_gcp/collector.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
from fix_plugin_gcp.resources.base import GcpResource, GcpProject, ExecutorQueue, GraphBuilder, GcpRegion, GcpZone
99
from fix_plugin_gcp.utils import Credentials
1010
from fixlib.baseresources import Cloud
11-
from fixlib.core.actions import CoreFeedback
11+
from fixlib.core.actions import CoreFeedback, ErrorAccumulator
1212
from fixlib.graph import Graph
1313

1414
log = logging.getLogger("fix.plugins.gcp")
@@ -51,13 +51,15 @@ def __init__(
5151
self.cloud = cloud
5252
self.project = project
5353
self.core_feedback = core_feedback
54+
self.error_accumulator = ErrorAccumulator()
5455
self.graph = Graph(root=self.project, max_nodes=max_resources_per_account)
5556
self.credentials = Credentials.get(self.project.id)
5657

5758
def collect(self) -> None:
5859
with ThreadPoolExecutor(
5960
thread_name_prefix=f"gcp_{self.project.id}", max_workers=self.config.project_pool_size
6061
) as executor:
62+
self.core_feedback.progress_done(self.project.id, 0, 1, context=[self.cloud.id])
6163
# The shared executor is used to parallelize the collection of resources "as fast as possible"
6264
# It should only be used in scenarios, where it is safe to do so.
6365
# This executor is shared between all regions.
@@ -70,6 +72,7 @@ def collect(self) -> None:
7072
self.credentials,
7173
shared_queue,
7274
self.core_feedback,
75+
self.error_accumulator,
7376
project_global_region,
7477
)
7578
global_builder.add_node(project_global_region, {})
@@ -95,6 +98,8 @@ def collect(self) -> None:
9598
global_builder.submit_work(self.collect_region, region, global_builder.for_region(region))
9699
global_builder.executor.wait_for_submitted_work()
97100

101+
self.error_accumulator.report_all(global_builder.core_feedback)
102+
98103
log.info(f"[GCP:{self.project.id}] Connect resources and create edges.")
99104
# connect nodes
100105
for node, data in list(self.graph.nodes(data=True)):
@@ -110,6 +115,7 @@ def collect(self) -> None:
110115
if isinstance(node, GcpResource):
111116
node.post_process_instance(global_builder, data.get("source", {}))
112117

118+
self.core_feedback.progress_done(self.project.id, 1, 1, context=[self.cloud.id])
113119
log.info(f"[GCP:{self.project.id}] Collecting resources done.")
114120

115121
def remove_unconnected_nodes(self):

plugins/gcp/fix_plugin_gcp/resources/aiplatform.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,14 +35,15 @@ class AIPlatformRegionFilter:
3535
def collect_resources(cls, builder: GraphBuilder, **kwargs: Any) -> List[GcpResource]:
3636
# Default behavior: in case the class has an ApiSpec, call the api and call collect.
3737
if issubclass(cls, GcpResource):
38-
if kwargs:
39-
log.info(f"[GCP:{builder.project.id}] Collecting {cls.kind} with ({kwargs})")
40-
else:
41-
log.info(f"[GCP:{builder.project.id}] Collecting {cls.kind}")
38+
region_name = "global" if not builder.region else builder.region.safe_name
39+
log.info(f"[GCP:{builder.project.id}:{region_name}] Collecting {cls.kind}")
4240
if spec := cls.api_spec:
4341
expected_errors = GcpExpectedErrorCodes | (spec.expected_errors or set()) | {"HttpError:none:none"}
4442
with GcpErrorHandler(
45-
builder.core_feedback,
43+
spec.action,
44+
builder.error_accumulator,
45+
spec.service,
46+
builder.region.safe_name if builder.region else None,
4647
expected_errors,
4748
f" in {builder.project.id} kind {cls.kind}",
4849
expected_message_substrings,

plugins/gcp/fix_plugin_gcp/resources/base.py

Lines changed: 41 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
ModelReference,
2424
)
2525
from fixlib.config import Config
26-
from fixlib.core.actions import CoreFeedback
26+
from fixlib.core.actions import CoreFeedback, ErrorAccumulator
2727
from fixlib.graph import Graph, EdgeKey
2828
from fixlib.json import from_json as from_js, value_in_path
2929
from fixlib.json_bender import bend, Bender, S, Bend, MapDict, F
@@ -78,6 +78,7 @@ def __init__(
7878
credentials: GoogleAuthCredentials,
7979
executor: ExecutorQueue,
8080
core_feedback: CoreFeedback,
81+
error_accumulator: ErrorAccumulator,
8182
fallback_global_region: GcpRegion,
8283
region: Optional[GcpRegion] = None,
8384
graph_nodes_access: Optional[Lock] = None,
@@ -87,12 +88,11 @@ def __init__(
8788
self.cloud = cloud
8889
self.region = region
8990
self.project = project
90-
self.client = GcpClient(
91-
credentials, project_id=project.id, region=region.name if region else None, core_feedback=core_feedback
92-
)
91+
self.client = GcpClient(credentials, project_id=project.id, region=region.name if region else None)
9392
self.executor = executor
9493
self.name = f"GCP:{project.name}"
9594
self.core_feedback = core_feedback
95+
self.error_accumulator = error_accumulator
9696
self.fallback_global_region = fallback_global_region
9797
self.region_by_name: Dict[str, GcpRegion] = {}
9898
self.region_by_zone_name: Dict[str, GcpRegion] = {}
@@ -272,6 +272,7 @@ def for_region(self, region: GcpRegion) -> GraphBuilder:
272272
self.client.credentials,
273273
self.executor,
274274
self.core_feedback,
275+
self.error_accumulator,
275276
self.fallback_global_region,
276277
region,
277278
self.graph_nodes_access,
@@ -381,7 +382,14 @@ def collect_resources(cls: Type[GcpResource], builder: GraphBuilder, **kwargs: A
381382
log.info(f"[GCP:{builder.project.id}] Collecting {cls.kind}")
382383
if spec := cls.api_spec:
383384
expected_errors = GcpExpectedErrorCodes | (spec.expected_errors or set())
384-
with GcpErrorHandler(builder.core_feedback, expected_errors, f" in {builder.project.id} kind {cls.kind}"):
385+
with GcpErrorHandler(
386+
spec.action,
387+
builder.error_accumulator,
388+
spec.service,
389+
builder.region.safe_name if builder.region else None,
390+
expected_errors,
391+
f" in {builder.project.id} kind {cls.kind}",
392+
):
385393
items = builder.client.list(spec, **kwargs)
386394
resources = cls.collect(items, builder)
387395
log.info(f"[GCP:{builder.project.id}] finished collecting: {cls.kind}")
@@ -610,12 +618,18 @@ class GcpZone(GcpResource, BaseZone):
610618
class GcpErrorHandler:
611619
def __init__(
612620
self,
613-
core_feedback: CoreFeedback,
621+
action: str,
622+
error_accumulator: ErrorAccumulator,
623+
service: str,
624+
region: Optional[str],
614625
expected_errors: Set[str],
615626
extra_info: str = "",
616627
expected_message_substrings: Optional[Set[str]] = None,
617628
) -> None:
618-
self.core_feedback = core_feedback
629+
self.action = action
630+
self.error_accumulator = error_accumulator
631+
self.service = service
632+
self.region = region
619633
self.extra_info = extra_info
620634
self.expected_errors = expected_errors
621635
self.expected_message_substrings = expected_message_substrings
@@ -649,31 +663,39 @@ def __exit__(
649663
if self.expected_message_substrings:
650664
for substring in self.expected_message_substrings:
651665
if substring in error_details:
652-
log.info(f"Ignoring expected HttpError in {self.extra_info}: {error_details}.")
653666
return True # Suppress the exception
654667
except Exception as ex:
655668
errors = {f"ParseError:unknown:{ex}"}
656669
error_summary = " Error Codes: " + (", ".join(errors)) if errors else ""
657670

658671
if errors and errors.issubset(self.expected_errors):
659-
log.info(
672+
log.debug(
660673
f"Expected Exception while collecting{self.extra_info} ({exc_type.__name__}): "
661674
f"{error_details}{error_summary}. Ignore."
662675
)
663676
return True
664677

665678
if not Config.gcp.discard_account_on_resource_error:
666-
self.core_feedback.error(
667-
f"Error while collecting{self.extra_info} ({exc_type.__name__}): " f"{error_details}{error_summary}",
668-
log,
679+
if exc_type is HttpError and isinstance(exc_value, HttpError):
680+
if exc_value.resp.status == 403:
681+
self.error_accumulator.add_error(
682+
as_info=False,
683+
error_kind="AccessDenied",
684+
service=self.service,
685+
action=self.action,
686+
message=f"Access denied: {error_details}",
687+
region=None,
688+
)
689+
return True
690+
691+
self.error_accumulator.add_error(
692+
as_info=False,
693+
error_kind=exc_type.__name__,
694+
service=self.service,
695+
action=self.action,
696+
message=f"Error while collecting{self.extra_info}: {error_details}{error_summary}",
697+
region=self.region,
669698
)
670699
return True
671700

672-
if exc_type is HttpError and isinstance(exc_value, HttpError):
673-
if exc_value.resp.status == 403:
674-
self.core_feedback.error(
675-
f"Access denied{self.extra_info}: {error_details} Error Codes: {error_summary}", log
676-
)
677-
return True
678-
679701
return False

plugins/gcp/test/conftest.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
from fix_plugin_gcp.resources.base import GcpRegion, GraphBuilder, GcpProject
1212
from fixlib.baseresources import Cloud
1313
from fixlib.config import Config
14-
from fixlib.core.actions import CoreFeedback
14+
from fixlib.core.actions import CoreFeedback, ErrorAccumulator
1515
from fixlib.graph import Graph
1616
from fixlib.threading import ExecutorQueue
1717
from .random_client import build_random_data_client, random_predefined
@@ -27,10 +27,13 @@ def random_builder() -> Iterator[GraphBuilder]:
2727
gcp_client._discovery_function = build_random_data_client
2828
queue = ExecutorQueue(executor, "dummy")
2929
feedback = CoreFeedback("test", "test", "test", Queue())
30+
accumulator = ErrorAccumulator()
3031
project = GcpProject(id="test")
3132
project_global_region = GcpRegion.fallback_global_region(project)
3233
credentials = AnonymousCredentials() # type: ignore
33-
builder = GraphBuilder(Graph(), Cloud(id="gcp"), project, credentials, queue, feedback, project_global_region)
34+
builder = GraphBuilder(
35+
Graph(), Cloud(id="gcp"), project, credentials, queue, feedback, accumulator, project_global_region
36+
)
3437
builder.add_node(project_global_region, {})
3538
# add predefined regions and zones
3639
for predefined in random_predefined:

0 commit comments

Comments
 (0)