diff --git a/plugins/digitalocean/resoto_plugin_digitalocean/__init__.py b/plugins/digitalocean/resoto_plugin_digitalocean/__init__.py index 48ed9dd5c..8ee60c6fe 100644 --- a/plugins/digitalocean/resoto_plugin_digitalocean/__init__.py +++ b/plugins/digitalocean/resoto_plugin_digitalocean/__init__.py @@ -15,7 +15,9 @@ from resotolib.logger import log from resotolib.graph import Graph from resotolib.baseresources import BaseResource +from resotolib.json import value_in_path import time +from datetime import datetime, timezone, timedelta class DigitalOceanCollectorPlugin(BaseCollectorPlugin): @@ -37,6 +39,20 @@ def collect(self) -> None: assert self.core_feedback, "core_feedback is not set" # will be set by the outer collector plugin + def get_last_run() -> datetime: + one_hour_ago = datetime.now(timezone.utc) - timedelta(hours=1) + td = self.task_data + if not td: + return one_hour_ago + timestamp = value_in_path(td, ["timing", td.get("step", ""), "started_at"]) + + if timestamp is None: + return one_hour_ago + + return datetime.fromtimestamp(timestamp, timezone.utc) + + last_run_started_at = get_last_run() + def from_legacy_config() -> List[DigitalOceanTeamCredentials]: tokens: List[str] = Config.digitalocean.api_tokens spaces_access_keys: List[str] = Config.digitalocean.spaces_access_keys @@ -85,11 +101,11 @@ def key_to_tuple(key: str) -> Tuple[str, str]: c.spaces_keys.access_key if c.spaces_keys else None, c.spaces_keys.secret_key if c.spaces_keys else None, ) - team_graph = self.collect_team(client, self.core_feedback.with_context("digitalocean")) + team_graph = self.collect_team(client, self.core_feedback.with_context("digitalocean"), last_run_started_at) if team_graph: self.send_account_graph(team_graph) - def collect_team(self, client: StreamingWrapper, feedback: CoreFeedback) -> Optional[Graph]: + def collect_team(self, client: StreamingWrapper, feedback: CoreFeedback, last_run: datetime) -> Optional[Graph]: """Collects an individual team.""" team_id = client.get_team_id() team = DigitalOceanTeam(id=team_id, tags={}, urn=f"do:team:{team_id}") @@ -97,7 +113,7 @@ def collect_team(self, client: StreamingWrapper, feedback: CoreFeedback) -> Opti try: feedback.progress_done(team_id, 0, 1) team_feedback = feedback.with_context("digitalocean", client.get_team_id()) - dopc = DigitalOceanTeamCollector(team, client.with_feedback(team_feedback)) + dopc = DigitalOceanTeamCollector(team, client.with_feedback(team_feedback), last_run) dopc.collect() feedback.progress_done(team_id, 1, 1) except Exception: diff --git a/plugins/digitalocean/resoto_plugin_digitalocean/client.py b/plugins/digitalocean/resoto_plugin_digitalocean/client.py index 8fa37112a..1e761d1d1 100644 --- a/plugins/digitalocean/resoto_plugin_digitalocean/client.py +++ b/plugins/digitalocean/resoto_plugin_digitalocean/client.py @@ -1,13 +1,14 @@ import logging from attrs import define from functools import lru_cache -from typing import List, Any, Optional, Union, TypeVar, Callable +from typing import List, Any, Optional, Union, TypeVar, Callable, Mapping import boto3 import requests from botocore.exceptions import EndpointConnectionError, HTTPClientError from retrying import retry as retry_decorator -from urllib.parse import urljoin +from urllib.parse import urljoin, urlencode +from datetime import datetime from resoto_plugin_digitalocean.utils import RetryableHttpError from resoto_plugin_digitalocean.utils import retry_on_error @@ -83,10 +84,15 @@ def check_status_code(self, response: requests.Response) -> bool: return False @retry - def _fetch(self, path: str, payload_object_name: str) -> List[Json]: + def _fetch(self, path: str, payload_object_name: str, query: Optional[Mapping[str, str]] = None) -> List[Json]: result: List[Json] = [] - url = f"{self.do_api_endpoint}{path}?page=1&per_page=200" + url = f"{self.do_api_endpoint}{path}" + params = {"page": "1", "per_page": "200"} + params.update(query or {}) + + url = f"{url}?{urlencode(params)}" + log.debug(f"fetching {url}") def validate_status(response: requests.Response) -> requests.Response: @@ -349,6 +355,14 @@ def list_firewalls(self) -> List[Json]: def list_alert_policies(self) -> List[Json]: return self._fetch("/monitoring/alerts", "policies") + def get_droplet_cpu_usage(self, droplet_id: str, start: datetime, end: datetime) -> List[Json]: + query_params = {"host_id": droplet_id, "start": str(start.timestamp()), "end": str(end.timestamp())} + return self._fetch("/monitoring/metrics/droplet/cpu", "data", query_params) + + def get_droplet_memory_available(self, droplet_id: str, start: datetime, end: datetime) -> List[Json]: + query_params = {"host_id": droplet_id, "start": str(start.timestamp()), "end": str(end.timestamp())} + return self._fetch("/monitoring/metrics/droplet/memory_available", "data", query_params) + TeamId = str diff --git a/plugins/digitalocean/resoto_plugin_digitalocean/collector.py b/plugins/digitalocean/resoto_plugin_digitalocean/collector.py index 6cc53520f..6fee2a22d 100644 --- a/plugins/digitalocean/resoto_plugin_digitalocean/collector.py +++ b/plugins/digitalocean/resoto_plugin_digitalocean/collector.py @@ -1,13 +1,18 @@ import logging import math from pprint import pformat -from typing import Tuple, Type, List, Dict, Callable, Any, Optional, cast +from typing import Tuple, Type, List, Dict, Callable, Any, Optional, cast, DefaultDict +from datetime import datetime, timedelta, timezone +from concurrent import futures +from collections import defaultdict +import statistics from prometheus_client import Summary from resotolib.baseresources import BaseResource, EdgeType, InstanceStatus, VolumeStatus from resotolib.graph import Graph from resotolib.types import Json +from resotolib.utils import utc from hashlib import sha256 from .client import StreamingWrapper from .resources import ( @@ -171,9 +176,10 @@ class DigitalOceanTeamCollector: all DigitalOcean resources """ - def __init__(self, team: DigitalOceanTeam, client: StreamingWrapper) -> None: + def __init__(self, team: DigitalOceanTeam, client: StreamingWrapper, last_run_started_at: datetime) -> None: self.client = client self.team = team + self.last_run_started_at = last_run_started_at # Mandatory collectors are always collected regardless of whether # they were included by --do-collect or excluded by --do-no-collect @@ -424,7 +430,7 @@ def add_edge(search_map_key: str, is_parent: bool) -> None: @metrics_collect_droplets.time() def collect_droplets(self) -> None: - instances = self.client.list_droplets() + droplets = self.client.list_droplets() def get_image(droplet: Json) -> Json: image = droplet["image"] @@ -440,7 +446,113 @@ def remove_duplicates(resources: List[Json], id_field: str) -> List[Json]: seen_ids.add(resource[id_field]) return unique - images = [get_image(instance) for instance in instances] + droplet_ids: List[int] = [droplet["id"] for droplet in droplets] + start_time = self.last_run_started_at + end_time = utc() + if end_time - start_time < timedelta(minutes=4): + start_time = end_time - timedelta(minutes=4) + + cpu_usage_metrics_results: Dict[int, Json] = {} + memory_usage_metrics_results: Dict[int, Json] = {} + + with futures.ThreadPoolExecutor(max_workers=10) as executor: + cpu_future_to_droplet_id = { + executor.submit( + self.client.get_droplet_cpu_usage, + str(d_id), + start_time, + end_time, # todo: make this configurable + ): d_id + for d_id in droplet_ids + } + memory_future_to_droplet_id = { + executor.submit( + self.client.get_droplet_memory_available, + str(d_id), + start_time, + end_time, # todo: make this configurable + ): d_id + for d_id in droplet_ids + } + + for f in futures.as_completed(cpu_future_to_droplet_id): + d_id = cpu_future_to_droplet_id[f] + results = f.result() + if results: + cpu_usage_metrics_results[d_id] = results[0] + for f in futures.as_completed(memory_future_to_droplet_id): + d_id = memory_future_to_droplet_id[f] + results = f.result() + if results: + memory_usage_metrics_results[d_id] = results[0] + + def get_cpu_utilization(metric_json: Json, num_cpu_cores: float) -> Dict[str, float]: + result: DefaultDict[datetime, Dict[str, float]] = defaultdict(dict) + + for item in metric_json["result"]: + mode = item["metric"]["mode"] + if mode != "idle": + continue + + for values in item["values"]: + time = datetime.fromtimestamp(values[0], tz=timezone.utc) + value = float(values[1]) + result[time][mode] = value + + delta = 0.0 + previous_data: Dict[str, float] = {} + previous_time = None + + utilization_data: List[float] = [] + + for timestamp, data in result.items(): + delta_total = 0.0 + for mode, value in data.items(): + delta = value - previous_data.get(mode, value) + delta_total += delta + if previous_time is not None: + interval_seconds = (timestamp - previous_time).total_seconds() + utilization = (1 - delta / (interval_seconds * num_cpu_cores)) * 100 + utilization_data.append(utilization) + previous_data = data + previous_time = timestamp + + return { + "min": round(min(utilization_data), 3), + "avg": round(statistics.mean(utilization_data), 3), + "max": round(max(utilization_data), 3), + } + + def get_memory_utilization(metric_json: Json, total_mbytes: int) -> Dict[str, float]: + total_bytes = total_mbytes * 1024 * 1024 + mem_utilization: List[float] = [] + + for item in metric_json["result"]: + for values in item["values"]: + value = int(values[1]) + utilizaton = (value / total_bytes) * 100 + mem_utilization.append(utilizaton) + + return { + "min": round(min(mem_utilization), 3), + "avg": round(statistics.mean(mem_utilization), 3), + "max": round(max(mem_utilization), 3), + } + + resource_usage: Dict[int, Dict[str, Dict[str, float]]] = {} + + for droplet in droplets: + try: + cpu_utilizaton = get_cpu_utilization(cpu_usage_metrics_results[droplet["id"]], droplet["vcpus"]) + mem_utilization = get_memory_utilization(memory_usage_metrics_results[droplet["id"]], droplet["memory"]) + resource_usage[droplet["id"]] = { + "cpu_utilization": cpu_utilizaton, + "memory_utilization": mem_utilization, + } + except Exception as e: + log.warning("Failed to get utilization metrics for droplet %s: %s", droplet["id"], e) + + images = [get_image(instance) for instance in droplets] images = remove_duplicates(images, "id") self.collect_resource( @@ -469,7 +581,7 @@ def get_size(droplet: Json) -> Json: size["region"] = droplet["region"]["slug"] return cast(Json, size) - sizes = [get_size(instance) for instance in instances] + sizes = [get_size(instance) for instance in droplets] sizes = remove_duplicates(sizes, "slug") self.collect_resource( @@ -495,7 +607,7 @@ def get_size(droplet: Json) -> Json: "archive": InstanceStatus.TERMINATED, } self.collect_resource( - instances, + droplets, resource_class=DigitalOceanDroplet, attr_map={ "id": lambda i: str(i["id"]), @@ -509,6 +621,7 @@ def get_size(droplet: Json) -> Json: "droplet_features": "features", "droplet_image": lambda d: d["image"]["slug"], "tags": lambda instance: parse_tags(instance.get("tags", []) or []), + "_resource_usage": lambda d: resource_usage.get(d["id"], {}), }, search_map={ "_region": [ @@ -531,8 +644,8 @@ def get_size(droplet: Json) -> Json: m = sha256() neighbors = sorted(neighbors) m.update(DigitalOceanDropletNeighborhood.kind.encode()) - for droplet in neighbors or []: - m.update(droplet.encode()) + for n in neighbors or []: + m.update(n.encode()) id = m.hexdigest()[0:16] neighbors_json.append( { @@ -540,7 +653,7 @@ def get_size(droplet: Json) -> Json: "id": id, } ) - instances_to_region = {str(droplet["id"]): region_id(droplet["region"]["slug"]) for droplet in instances} + instances_to_region = {str(droplet["id"]): region_id(droplet["region"]["slug"]) for droplet in droplets} self.collect_resource( neighbors_json, resource_class=DigitalOceanDropletNeighborhood, diff --git a/plugins/digitalocean/resoto_plugin_digitalocean/config.py b/plugins/digitalocean/resoto_plugin_digitalocean/config.py index c1e78ddb3..bb32611ad 100644 --- a/plugins/digitalocean/resoto_plugin_digitalocean/config.py +++ b/plugins/digitalocean/resoto_plugin_digitalocean/config.py @@ -42,6 +42,7 @@ class DigitalOceanCollectorConfig: metadata={ "description": ( "DigitalOcean credentials for the teams to be collected. " + "Expected format: [{ 'api_token': 'foo', 'spaces_keys': {'access_key': 'bar', 'secret_key': 'baz'}}]. " "If provided, api_tokens and spaces_access_keys will be ignored" ) }, diff --git a/plugins/digitalocean/test/fixtures/__init__.py b/plugins/digitalocean/test/fixtures/__init__.py index c4a55155d..1ee32304b 100644 --- a/plugins/digitalocean/test/fixtures/__init__.py +++ b/plugins/digitalocean/test/fixtures/__init__.py @@ -26,3 +26,5 @@ from .domain_records import domain_records as domain_records from .firewalls import firewalls as firewalls from .alerts import alerts as alerts +from .cpu_metrics import cpu_metrics as cpu_metrics +from .memory_available import memory_available as memory_available diff --git a/plugins/digitalocean/test/fixtures/cpu_metrics.py b/plugins/digitalocean/test/fixtures/cpu_metrics.py new file mode 100644 index 000000000..976ca6ec5 --- /dev/null +++ b/plugins/digitalocean/test/fixtures/cpu_metrics.py @@ -0,0 +1,39 @@ +cpu_metrics = [ + { + "resultType": "matrix", + "result": [ + { + "metric": {"host_id": "289110074", "mode": "idle"}, + "values": [[1635386880, "122901.18"], [1635387000, "123020.92"], [1635387120, "123140.8"]], + }, + { + "metric": {"host_id": "289110074", "mode": "iowait"}, + "values": [[1635386880, "14.99"], [1635387000, "15.01"], [1635387120, "15.01"]], + }, + { + "metric": {"host_id": "289110074", "mode": "irq"}, + "values": [[1635386880, "0"], [1635387000, "0"], [1635387120, "0"]], + }, + { + "metric": {"host_id": "289110074", "mode": "nice"}, + "values": [[1635386880, "66.35"], [1635387000, "66.35"], [1635387120, "66.35"]], + }, + { + "metric": {"host_id": "289110074", "mode": "softirq"}, + "values": [[1635386880, "2.13"], [1635387000, "2.13"], [1635387120, "2.13"]], + }, + { + "metric": {"host_id": "289110074", "mode": "steal"}, + "values": [[1635386880, "7.89"], [1635387000, "7.9"], [1635387120, "7.91"]], + }, + { + "metric": {"host_id": "289110074", "mode": "system"}, + "values": [[1635386880, "140.09"], [1635387000, "140.2"], [1635387120, "140.23"]], + }, + { + "metric": {"host_id": "289110074", "mode": "user"}, + "values": [[1635386880, "278.57"], [1635387000, "278.65"], [1635387120, "278.69"]], + }, + ], + } +] diff --git a/plugins/digitalocean/test/fixtures/droplets.py b/plugins/digitalocean/test/fixtures/droplets.py index 8b877850d..5f604de1b 100644 --- a/plugins/digitalocean/test/fixtures/droplets.py +++ b/plugins/digitalocean/test/fixtures/droplets.py @@ -2,7 +2,7 @@ { "id": 289110074, "name": "ubuntu-s-1vcpu-1gb-fra1-01", - "memory": 1024, + "memory": 8096, "vcpus": 1, "disk": 25, "locked": False, @@ -180,7 +180,7 @@ { "id": 290075243, "name": "pool-1g2g56zow-u9fs4", - "memory": 2048, + "memory": 8096, "vcpus": 1, "disk": 50, "locked": False, diff --git a/plugins/digitalocean/test/fixtures/memory_available.py b/plugins/digitalocean/test/fixtures/memory_available.py new file mode 100644 index 000000000..5bd38e6c7 --- /dev/null +++ b/plugins/digitalocean/test/fixtures/memory_available.py @@ -0,0 +1,43 @@ +memory_available = [ + { + "resultType": "matrix", + "result": [ + { + "metric": {"host_id": "289110074"}, + "values": [ + [1691069400, "3834322944"], + [1691069520, "3828002816"], + [1691069640, "3840081920"], + [1691069760, "3826458624"], + [1691069880, "3849719808"], + [1691070000, "3860221952"], + [1691070120, "3850805248"], + [1691070240, "3847598080"], + [1691070360, "3854372864"], + [1691070480, "3838672896"], + [1691070600, "3839062016"], + [1691070720, "3852480512"], + [1691070840, "3861274624"], + [1691070960, "3853144064"], + [1691071080, "3865051136"], + [1691071200, "3857326080"], + [1691071320, "3852726272"], + [1691071440, "3847241728"], + [1691071560, "3835625472"], + [1691071680, "3826987008"], + [1691071800, "3845394432"], + [1691071920, "3842514944"], + [1691072040, "3836948480"], + [1691072160, "3848441856"], + [1691072280, "3847299072"], + [1691072400, "3842793472"], + [1691072520, "3840655360"], + [1691072640, "3839107072"], + [1691072760, "3846787072"], + [1691072880, "3856756736"], + [1691073000, "3849723904"], + ], + } + ], + } +] diff --git a/plugins/digitalocean/test/test_collector.py b/plugins/digitalocean/test/test_collector.py index 6aa896584..536feda43 100644 --- a/plugins/digitalocean/test/test_collector.py +++ b/plugins/digitalocean/test/test_collector.py @@ -33,6 +33,7 @@ ) from resotolib.baseresources import Cloud, EdgeType, GraphRoot, InstanceStatus, VolumeStatus from resotolib.core.actions import CoreFeedback +from resotolib.utils import utc from resotolib.graph import Graph from resotolib.graph import sanitize from .fixtures import ( @@ -61,6 +62,8 @@ domain_records, firewalls, alerts, + cpu_metrics, + memory_available, ) @@ -84,7 +87,7 @@ def wrapper(*args, **kwargs) -> Any: # type: ignore def prepare_graph(do_client: StreamingWrapper) -> Graph: cloud = Cloud(id="do") team = DigitalOceanTeam(id="test_team", urn="do:team:test_team") - plugin_instance = DigitalOceanTeamCollector(team, do_client) + plugin_instance = DigitalOceanTeamCollector(team, do_client, last_run_started_at=utc()) plugin_instance.collect() cloud_graph = Graph(root=cloud) graph = Graph(root=GraphRoot(id="root", tags={})) @@ -172,6 +175,8 @@ def test_collect_droplets() -> None: "list_vpcs": vpcs, "list_tags": tags, "list_droplets_neighbors_ids": neighbor_ids, + "get_droplet_cpu_usage": cpu_metrics, + "get_droplet_memory_available": memory_available, } ) graph = prepare_graph(do_client) @@ -209,7 +214,7 @@ def test_collect_droplets() -> None: assert droplet.urn == "do:droplet:289110074" assert droplet.name == "ubuntu-s-1vcpu-1gb-fra1-01" assert droplet.instance_type == "s-1vcpu-1gb" - assert droplet.instance_memory == 1 + assert droplet.instance_memory == 7.90625 assert droplet.instance_cores == 1 assert droplet.instance_status == InstanceStatus.RUNNING assert droplet.region().urn == "do:region:fra1" # type: ignore @@ -218,6 +223,14 @@ def test_collect_droplets() -> None: assert droplet.is_locked is False assert droplet.ctime == datetime.datetime(2022, 3, 3, 16, 26, 55, tzinfo=datetime.timezone.utc) assert droplet.tags == {"droplet_tag": None} + assert droplet._resource_usage == { + "cpu_utilization": {"min": 0.1, "avg": 0.158, "max": 0.217}, + "memory_utilization": { + "min": 45.074, + "avg": 45.301, + "max": 45.529, + }, + } neighborhood: DigitalOceanDropletNeighborhood = graph.search_first( "kind", DigitalOceanDropletNeighborhood.kind