Skip to content

Commit c626700

Browse files
1101-1aquamatthias
andauthored
[gcp][feat] Add metrics collection (#2283)
Co-authored-by: Matthias Veit <matthias_veit@yahoo.de>
1 parent 786318a commit c626700

File tree

20 files changed

+857
-36
lines changed

20 files changed

+857
-36
lines changed

fixlib/fixlib/baseresources.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -116,8 +116,7 @@ def get(self) -> Dict[str, Any]:
116116
return changes
117117

118118

119-
# todo: replace to StrEnum once resoto is on 3.11
120-
class MetricName(str, Enum):
119+
class MetricName(StrEnum):
121120
def __str__(self) -> str:
122121
return self.value
123122

@@ -150,6 +149,8 @@ def __str__(self) -> str:
150149

151150
# load balancers
152151
RequestCount = "request" # _count will be added to the end because of the unit
152+
RequestBytesCount = "request_bytes" # _count will be added to the end because of the unit
153+
ResponseBytesCount = "response_bytes" # _count will be added to the end because of the unit
153154
ActiveConnectionCount = "active_connection" # _count will be added to the end because of the unit
154155
ALBActiveConnectionCount = "alb_active_connection" # _count will be added to the end because of the unit
155156
ConnectionAttemptCount = "connection_attempt" # _count will be added to the end because of the unit
@@ -195,6 +196,8 @@ def __str__(self) -> str:
195196
DiskQueueDepth = "disk_queue_depth"
196197
NetworkReceiveThroughput = "network_receive_throughput"
197198
NetworkTransmitThroughput = "network_transmit_throughput"
199+
NetworkBytesSent = "network_bytes_sent"
200+
NetworkBytesReceived = "network_bytes_received"
198201

199202
# serverless
200203
Invocations = "invocations"

plugins/aws/fix_plugin_aws/collector.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -242,11 +242,11 @@ def get_last_run() -> Optional[datetime]:
242242
try:
243243
log.info(f"[Aws:{self.account.id}] Collect usage metrics.")
244244
self.collect_usage_metrics(global_builder)
245+
shared_queue.wait_for_submitted_work()
245246
except Exception as e:
246247
log.warning(
247248
f"Failed to collect usage metrics on account {self.account.id} in region {global_builder.region.id}: {e}"
248249
)
249-
shared_queue.wait_for_submitted_work()
250250

251251
# call all registered after collect hooks
252252
for after_collect in global_builder.after_collect_actions:
@@ -334,8 +334,9 @@ def collect_usage_metrics(self, builder: GraphBuilder) -> None:
334334
continue
335335
# region can be overridden in the query: s3 is global, but need to be queried per region
336336
if region := cast(AwsRegion, resource.region()):
337-
lookup_map[resource.id] = resource
338337
resource_queries: List[cloudwatch.AwsCloudwatchQuery] = resource.collect_usage_metrics(builder)
338+
if resource_queries:
339+
lookup_map[resource.id] = resource
339340
for query in resource_queries:
340341
query_region = query.region or region
341342
start = query.start_delta or builder.metrics_delta

plugins/azure/fix_plugin_azure/resource/metrics.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
from copy import deepcopy
12
from datetime import datetime, timedelta
23
from concurrent.futures import as_completed
34
import logging
@@ -271,12 +272,13 @@ def _query_for_single(
271272
interval: str,
272273
) -> "Tuple[Optional[AzureMetricData], Optional[str]]":
273274
try:
275+
local_api_spec = deepcopy(api_spec)
274276
# Set the path for the API call based on the instance ID of the query
275-
api_spec.path = f"{query.instance_id}/providers/Microsoft.Insights/metrics"
277+
local_api_spec.path = f"{query.instance_id}/providers/Microsoft.Insights/metrics"
276278
# Retrieve metric data from the API
277279
aggregation = ",".join(query.aggregation)
278280
part = builder.client.list(
279-
api_spec,
281+
local_api_spec,
280282
metricnames=query.metric_name,
281283
metricNamespace=query.metric_namespace,
282284
timespan=timespan,

plugins/gcp/fix_plugin_gcp/__init__.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from fixlib.core.actions import CoreFeedback
1111
from fixlib.graph import Graph, MaxNodesExceeded
1212
from fixlib.logger import log, setup_logger
13+
from fixlib.types import Json
1314
from .collector import GcpProjectCollector
1415
from .config import GcpConfig
1516
from .resources.base import GcpProject
@@ -77,10 +78,11 @@ def collect(self) -> None:
7778
project_id,
7879
feedback,
7980
cloud,
81+
self.task_data or {},
8082
max_resources_per_account=self.max_resources_per_account,
8183
**collect_args,
8284
)
83-
for project_id in credentials.keys()
85+
for project_id in credentials
8486
]
8587
for future in futures.as_completed(wait_for):
8688
project_graph = future.result()
@@ -98,6 +100,7 @@ def collect_project(
98100
project_id: str,
99101
core_feedback: CoreFeedback,
100102
cloud: Cloud,
103+
task_data: Json,
101104
args: Optional[Namespace] = None,
102105
running_config: Optional[RunningConfig] = None,
103106
credentials: Optional[Dict[str, Any]] = None,
@@ -130,7 +133,7 @@ def collect_project(
130133

131134
try:
132135
core_feedback.progress_done(project_id, 0, 1)
133-
gpc = GcpProjectCollector(Config.gcp, cloud, project, core_feedback, max_resources_per_account)
136+
gpc = GcpProjectCollector(Config.gcp, cloud, project, core_feedback, task_data, max_resources_per_account)
134137
try:
135138
gpc.collect()
136139
except MaxNodesExceeded as ex:

plugins/gcp/fix_plugin_gcp/collector.py

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import logging
22
from concurrent.futures import ThreadPoolExecutor
3-
from typing import Type, List, Any, Optional
3+
from datetime import datetime, timezone
4+
from typing import Type, List, Any, Optional, cast
45

56
from fix_plugin_gcp.config import GcpConfig
67
from fix_plugin_gcp.gcp_client import GcpApiSpec
@@ -14,12 +15,15 @@
1415
firestore,
1516
filestore,
1617
cloudfunctions,
18+
monitoring,
1719
)
1820
from fix_plugin_gcp.resources.base import GcpResource, GcpProject, ExecutorQueue, GraphBuilder, GcpRegion, GcpZone
1921
from fix_plugin_gcp.utils import Credentials
2022
from fixlib.baseresources import Cloud
2123
from fixlib.core.actions import CoreFeedback, ErrorAccumulator
2224
from fixlib.graph import Graph
25+
from fixlib.json import value_in_path
26+
from fixlib.types import Json
2327

2428
log = logging.getLogger("fix.plugins.gcp")
2529
all_resources: List[Type[GcpResource]] = (
@@ -58,6 +62,7 @@ def __init__(
5862
cloud: Cloud,
5963
project: GcpProject,
6064
core_feedback: CoreFeedback,
65+
task_data: Json,
6166
max_resources_per_account: Optional[int] = None,
6267
) -> None:
6368
self.config = config
@@ -67,6 +72,7 @@ def __init__(
6772
self.error_accumulator = ErrorAccumulator()
6873
self.graph = Graph(root=self.project, max_nodes=max_resources_per_account)
6974
self.credentials = Credentials.get(self.project.id)
75+
self.task_data = task_data
7076

7177
def collect(self) -> None:
7278
with ThreadPoolExecutor(
@@ -77,7 +83,20 @@ def collect(self) -> None:
7783
# It should only be used in scenarios, where it is safe to do so.
7884
# This executor is shared between all regions.
7985
shared_queue = ExecutorQueue(executor, self.project.safe_name)
86+
87+
def get_last_run() -> Optional[datetime]:
88+
td = self.task_data
89+
if not td:
90+
return None
91+
timestamp = value_in_path(td, ["timing", td.get("step", ""), "started_at"])
92+
93+
if timestamp is None:
94+
return None
95+
96+
return datetime.fromtimestamp(timestamp, timezone.utc)
97+
8098
project_global_region = GcpRegion.fallback_global_region(self.project)
99+
last_run = get_last_run()
81100
global_builder = GraphBuilder(
82101
self.graph,
83102
self.cloud,
@@ -87,6 +106,8 @@ def collect(self) -> None:
87106
self.core_feedback,
88107
self.error_accumulator,
89108
project_global_region,
109+
config=self.config,
110+
last_run_started_at=last_run,
90111
)
91112
global_builder.add_node(project_global_region, {})
92113

@@ -113,6 +134,13 @@ def collect(self) -> None:
113134

114135
self.error_accumulator.report_all(global_builder.core_feedback)
115136

137+
if global_builder.config.collect_usage_metrics:
138+
try:
139+
log.info(f"[GCP:{self.project.id}] Collect usage metrics.")
140+
self.collect_usage_metrics(global_builder)
141+
global_builder.executor.wait_for_submitted_work()
142+
except Exception as e:
143+
log.warning(f"Failed to collect usage metrics in project {self.project.id}: {e}")
116144
log.info(f"[GCP:{self.project.id}] Connect resources and create edges.")
117145
# connect nodes
118146
for node, data in list(self.graph.nodes(data=True)):
@@ -128,9 +156,19 @@ def collect(self) -> None:
128156
if isinstance(node, GcpResource):
129157
node.post_process_instance(global_builder, data.get("source", {}))
130158

159+
global_builder.executor.wait_for_submitted_work()
160+
131161
self.core_feedback.progress_done(self.project.id, 1, 1, context=[self.cloud.id])
132162
log.info(f"[GCP:{self.project.id}] Collecting resources done.")
133163

164+
def collect_usage_metrics(self, builder: GraphBuilder) -> None:
165+
for resource in builder.graph.nodes:
166+
if isinstance(resource, GcpResource) and (mq := resource.collect_usage_metrics(builder)):
167+
start_at = builder.created_at - builder.metrics_delta
168+
region = cast(GcpRegion, resource.region())
169+
rb = builder.for_region(region)
170+
monitoring.GcpMonitoringMetricData.query_for(rb, resource, mq, start_at, builder.created_at)
171+
134172
def remove_unconnected_nodes(self, builder: GraphBuilder) -> None:
135173
def rm_leaf_nodes(clazz: Any, ignore_kinds: Optional[Type[Any]] = None) -> None:
136174
remove_nodes = set()

plugins/gcp/fix_plugin_gcp/config.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
1-
from fixlib.proc import num_default_threads
21
from attrs import define, field
3-
from typing import List, ClassVar
2+
from typing import List, ClassVar, Optional
43

54

65
@define
@@ -17,7 +16,7 @@ class GcpConfig:
1716
metadata={"description": "GCP services to exclude (default: none)"},
1817
)
1918
project_pool_size: int = field(
20-
factory=num_default_threads,
19+
default=64,
2120
metadata={"description": "GCP project thread/process pool size"},
2221
)
2322
fork_process: bool = field(
@@ -31,6 +30,10 @@ class GcpConfig:
3130
"If false, the error is logged and the resource is skipped."
3231
},
3332
)
33+
collect_usage_metrics: Optional[bool] = field(
34+
default=True,
35+
metadata={"description": "Collect resource usage metrics via GCP Monitoring, enabled by default"},
36+
)
3437

3538
def should_collect(self, name: str) -> bool:
3639
if self.collect:

plugins/gcp/fix_plugin_gcp/resources/aiplatform.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,6 @@ class AIPlatformRegionFilter:
5050
def collect_resources(cls, builder: GraphBuilder, **kwargs: Any) -> List[GcpResource]:
5151
# Default behavior: in case the class has an ApiSpec, call the api and call collect.
5252
if issubclass(cls, GcpResource):
53-
region_name = "global" if not builder.region else builder.region.safe_name
54-
log.info(f"[GCP:{builder.project.id}:{region_name}] Collecting {cls.kind}")
5553
if spec := cls.api_spec:
5654
expected_errors = GcpExpectedErrorCodes | (spec.expected_errors or set()) | {"HttpError:none:none"}
5755
with GcpErrorHandler(
@@ -66,7 +64,9 @@ def collect_resources(cls, builder: GraphBuilder, **kwargs: Any) -> List[GcpReso
6664
if builder.region:
6765
items = builder.client.list(spec, **kwargs)
6866
collected_resources = cls.collect(items, builder)
69-
log.info(f"[GCP:{builder.project.id}] finished collecting: {cls.kind}")
67+
log.info(
68+
f"[GCP:{builder.project.id}:{builder.region.safe_name}] finished collecting: {cls.kind}"
69+
)
7070
return collected_resources
7171
return []
7272

0 commit comments

Comments
 (0)