Skip to content

Commit

Permalink
[gcp][fix] Zonal Handling, Resource removal and speed improvements (#…
Browse files Browse the repository at this point in the history
…1689)

* [gcp][fix] Zonal Handling, Resource removal and speed improvements

* pylint

* do not delete quotas
  • Loading branch information
aquamatthias committed Jun 28, 2023
1 parent ee18821 commit 267c5fd
Show file tree
Hide file tree
Showing 7 changed files with 88 additions and 42 deletions.
42 changes: 30 additions & 12 deletions plugins/gcp/resoto_plugin_gcp/collector.py
@@ -1,12 +1,12 @@
from concurrent.futures import ThreadPoolExecutor
import logging
from typing import Type, List
from concurrent.futures import ThreadPoolExecutor
from typing import Type, List, Any, Optional

from resoto_plugin_gcp.config import GcpConfig
from resoto_plugin_gcp.gcp_client import GcpApiSpec
from resoto_plugin_gcp.utils import Credentials
from resoto_plugin_gcp.resources import compute, container, billing, sqladmin, storage
from resoto_plugin_gcp.resources.base import GcpResource, GcpProject, ExecutorQueue, GraphBuilder, GcpRegion, GcpZone
from resoto_plugin_gcp.utils import Credentials
from resotolib.baseresources import Cloud
from resotolib.core.actions import CoreFeedback
from resotolib.graph import Graph
Expand Down Expand Up @@ -81,32 +81,50 @@ def collect(self) -> None:
if region.name == "global":
continue
global_builder.submit_work(self.collect_region, region, global_builder.for_region(region))

global_builder.executor.wait_for_submitted_work()

# connect nodes
for node, data in list(self.graph.nodes(data=True)):
if isinstance(node, GcpResource):
node.connect_in_graph(global_builder, data.get("source", {}))

log.info(f"[GCP:{self.project.id}] Collecting resources done.")
# remove unconnected nodes
self.remove_unconnected_nodes()

# post process nodes
for node, data in list(self.graph.nodes(data=True)):
if isinstance(node, GcpResource):
node.post_process_instance(global_builder, data.get("source", {}))

log.info(f"[GCP:{self.project.id}] Collecting resources done.")

def remove_unconnected_nodes(self):
remove_nodes = set()
remove_nodes = []

def rmnodes(cls) -> None:
def rm_nodes(cls, ignore_kinds: Optional[Type[Any]] = None) -> None:
for node in self.graph.nodes:
if isinstance(node, cls) and not any(True for _ in self.graph.successors(node)):
remove_nodes.add(node)
if not isinstance(node, cls):
continue
suc = list(self.graph.successors(node))
filtered = [s for s in suc if not isinstance(s, ignore_kinds)] if ignore_kinds else suc
if not filtered:
remove_nodes.extend(suc)
remove_nodes.append(node)
removed = set()
for node in remove_nodes:
if node in removed:
continue
removed.add(node)
self.graph.remove_node(node)
log.debug(f"Removing {len(remove_nodes)} unreferenced nodes of type {cls}")
remove_nodes.clear()

# nodes need to be removed in the correct order
rmnodes((compute.GcpNodeType, compute.GcpMachineType, compute.GcpDiskType, compute.GcpAcceleratorType))
rmnodes(billing.GcpSku)
rmnodes(billing.GcpService)
rm_nodes((compute.GcpNodeType, compute.GcpDiskType))
rm_nodes(compute.GcpMachineType, compute.GcpAcceleratorType) # ignore accelerator types
rm_nodes(compute.GcpAcceleratorType)
rm_nodes(billing.GcpSku)
rm_nodes(billing.GcpService)

def collect_region(self, region: GcpRegion, regional_builder: GraphBuilder) -> None:
# fetch all region level resources
Expand Down
24 changes: 19 additions & 5 deletions plugins/gcp/resoto_plugin_gcp/gcp_client.py
@@ -1,6 +1,6 @@
from __future__ import annotations

from typing import Optional, List, Dict, Any, Set
from typing import Optional, List, Dict, Any, Set, Tuple

from attr import define, evolve
from google.auth.credentials import Credentials
Expand Down Expand Up @@ -160,11 +160,11 @@ def next_responses(request: Any) -> None:
response = request.execute()
page = value_in_path(response, api_spec.response_path)
if (sub_path := api_spec.response_regional_sub_path) is not None and isinstance(page, dict):
for zone_marker, zone_response in page.items():
zone = zone_marker.split("/")[-1]
for item in value_in_path(zone_response, sub_path) or []:
for zonal_marker, zonal_response in page.items():
zone_prop, zonal_name = self.__extract_zonal_prop(zonal_marker)
for item in value_in_path(zonal_response, sub_path) or []:
# store the zone as part of the item
item[InternalZoneProp] = zone
item[zone_prop] = zonal_name
result.append(item)
elif isinstance(page, list):
result.extend(page)
Expand All @@ -180,3 +180,17 @@ def next_responses(request: Any) -> None:

next_responses(getattr(executor, api_spec.action)(**params))
return result

@staticmethod
def __extract_zonal_prop(name: str) -> Tuple[str, str]:
if name == "global":
return RegionProp, name
if "/" not in name:
raise ValueError(f"Unexpected zonal name: {name}")
zonal_kind, zonal_name = name.split("/", maxsplit=1)
if zonal_kind == "regions":
return RegionProp, zonal_name
elif zonal_kind == "zones":
return InternalZoneProp, zonal_name
else:
raise ValueError(f"Unexpected zonal kind: {zonal_kind}")
40 changes: 25 additions & 15 deletions plugins/gcp/resoto_plugin_gcp/resources/base.py
Expand Up @@ -195,51 +195,54 @@ def add_node(self, node: GcpResourceType, source: Optional[Json] = None) -> Opti
node._cloud = self.cloud
node._account = self.project

self._standard_edges(node, source)

with self.graph_nodes_access:
self.graph.add_node(node, source=source or {})
return node
if self._standard_edges(node, source):
with self.graph_nodes_access:
self.graph.add_node(node, source=source or {})
return node
return None

def _standard_edges(self, node: GcpResourceType, source: Optional[Json] = None) -> None:
def _standard_edges(self, node: GcpResourceType, source: Optional[Json] = None) -> bool:
if isinstance(node, GcpRegion):
self.add_edge(node, node=self.project, reverse=True)
return
return True
if node._zone:
self.add_edge(node, node=node._zone, reverse=True)
return
return True
if node._region:
self.add_edge(node, node=node._region, reverse=True)
return
return True

if source is not None:
if InternalZoneProp in source:
if zone := self.zone_by_name.get(source[InternalZoneProp]):
node._zone = zone
node._region = self.region_by_zone_name[source[InternalZoneProp]]
self.add_edge(node, node=zone, reverse=True)
return
return True
else:
log.debug(f"Zone {source[InternalZoneProp]} not found for node: {node}.")
log.debug(f"Zone {source[InternalZoneProp]} not found for node: {node}. Ignore resource.")
return False

if RegionProp in source:
region_name = source[RegionProp].rsplit("/", 1)[-1]
if region := self.region_by_name.get(region_name):
node._region = region
self.add_edge(node, node=region, reverse=True)
return
return True
else:
log.debug(f"Region {region_name} not found for node: {node}.")
log.debug(f"Region {region_name} not found for node: {node}. Ignore resource.")
return False

# Fallback to GraphBuilder region, i.e. regional collection
if self.region is not None:
node._region = self.region
self.add_edge(node, node=self.region, reverse=True)
return
return True

# Fallback to global region
node._region = self.fallback_global_region
self.add_edge(node, node=self.fallback_global_region, reverse=True)
return
return True

def add_edge(
self,
Expand Down Expand Up @@ -395,6 +398,13 @@ def connect_in_graph(self, builder: GraphBuilder, source: Json) -> None:
"""
pass

def post_process_instance(self, builder: GraphBuilder, source: Json) -> None:
"""
Hook method to post process the resource after all connections are done.
Default: do nothing.
"""
pass

@classmethod
def collect_resources(cls: Type[GcpResource], builder: GraphBuilder, **kwargs: Any) -> List[GcpResource]:
# Default behavior: in case the class has an ApiSpec, call the api and call collect.
Expand Down
6 changes: 4 additions & 2 deletions plugins/gcp/resoto_plugin_gcp/resources/compute.py
Expand Up @@ -786,7 +786,7 @@ class GcpDiskType(GcpResource, BaseVolumeType):
"pd-standard": "PDStandard",
}

def connect_in_graph(self, builder: GraphBuilder, source: Json) -> None:
def post_process_instance(self, builder: GraphBuilder, source: Json) -> None:
"""Adds edges from disk_types type to SKUs and determines ondemand pricing"""
if not self.name:
return
Expand Down Expand Up @@ -3421,7 +3421,9 @@ def connect_in_graph(self, builder: GraphBuilder, source: Json) -> None:
# Add edge from machine type to accelerator type
for at in self.accelerators or []:
# The accelerator type resource name, not a full URL, e.g. nvidia-tesla-t4.
builder.add_edge(self, clazz=GcpAcceleratorType, id=at.guest_accelerator_type)
builder.add_edge(self, clazz=GcpAcceleratorType, id=at.guest_accelerator_type, reverse=True)

def post_process_instance(self, builder: GraphBuilder, source: Json) -> None:
# Adds edges from machine type to SKUs and determines ondemand pricing
if not self.name:
return
Expand Down
8 changes: 4 additions & 4 deletions plugins/gcp/test/files/database_instance.json
Expand Up @@ -18,7 +18,7 @@
"requireSsl": false
},
"locationPreference": {
"zone": "us-central1-b",
"zone": "us-east1-b",
"kind": "sql#locationPreference"
},
"databaseFlags": [
Expand Down Expand Up @@ -75,10 +75,10 @@
"serviceAccountEmailAddress": "foo",
"backendType": "SECOND_GEN",
"selfLink": "https://content-sqladmin.googleapis.com/v1/projects/inbound-axon-320811/instances/testdb",
"connectionName": "inbound-axon-320811:us-central1:testdb",
"connectionName": "inbound-axon-320811:us-east1:testdb",
"name": "testdb",
"region": "us-central1",
"gceZone": "us-central1-b",
"region": "us-east1",
"gceZone": "us-east1-b",
"databaseInstalledVersion": "MYSQL_8_0_26",
"maintenanceVersion": "MYSQL_8_0_26.R20230314.03_02",
"createTime": "2023-04-25T11:16:57.136Z"
Expand Down
2 changes: 1 addition & 1 deletion plugins/gcp/test/random_client.py
Expand Up @@ -62,7 +62,7 @@ def random_datetime() -> str:
"compute.zones.list": {"id": "regions", "items": []},
}
# dictionary keys under .items (used for aggregated list zone result) -> return zone ids
PredefinedDictKeys = {".items": [a.id for a in random_zones]}
PredefinedDictKeys = {".items": [f"zones/{a.id}" for a in random_zones]}

# type_name -> property_name -> callable that returns fixed value
FixtureReplies: Dict[str, Dict[str, Callable[[], JsonElement]]] = {}
Expand Down
8 changes: 5 additions & 3 deletions plugins/gcp/test/test_compute.py
Expand Up @@ -47,10 +47,12 @@ def test_disk_type_ondemand_cost(random_builder: GraphBuilder) -> None:
("pd-standard", "us-east1", 0.08),
]
with open(os.path.dirname(__file__) + "/files/skus.json") as f:
GcpSku.collect(raw=json.load(f)["skus"], builder=random_builder)
for r in GcpSku.collect(raw=json.load(f)["skus"], builder=random_builder):
r.post_process_instance(random_builder, {})

with open(os.path.dirname(__file__) + "/files/disk_type.json") as f:
GcpDiskType.collect(raw=json.load(f)["items"]["diskTypes"], builder=random_builder)
for r in GcpDiskType.collect(raw=json.load(f)["items"]["diskTypes"], builder=random_builder):
r.post_process_instance(random_builder, {})

regions = random_builder.resources_of(GcpRegion)
disk_types = random_builder.resources_of(GcpDiskType)
Expand Down Expand Up @@ -191,7 +193,7 @@ def test_machine_type_ondemand_cost(random_builder: GraphBuilder) -> None:
machine_type = next((obj for obj in machine_types if obj.name == price[0]), None)
assert machine_type
machine_type._region = region
machine_type.connect_in_graph(random_builder, {"Dummy": "Source"})
machine_type.post_process_instance(random_builder, {"Dummy": "Source"})
assert machine_type.ondemand_cost
assert round(machine_type.ondemand_cost, 5) == price[2]

Expand Down

0 comments on commit 267c5fd

Please sign in to comment.