Skip to content

Commit

Permalink
[plugins/digitalocean][feat] Collect CPU/Memory Utilization metrics (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
meln1k committed Aug 4, 2023
1 parent 2540a2b commit 883ef21
Show file tree
Hide file tree
Showing 9 changed files with 261 additions and 20 deletions.
22 changes: 19 additions & 3 deletions plugins/digitalocean/resoto_plugin_digitalocean/__init__.py
Expand Up @@ -15,7 +15,9 @@
from resotolib.logger import log
from resotolib.graph import Graph
from resotolib.baseresources import BaseResource
from resotolib.json import value_in_path
import time
from datetime import datetime, timezone, timedelta


class DigitalOceanCollectorPlugin(BaseCollectorPlugin):
Expand All @@ -37,6 +39,20 @@ def collect(self) -> None:

assert self.core_feedback, "core_feedback is not set" # will be set by the outer collector plugin

def get_last_run() -> datetime:
one_hour_ago = datetime.now(timezone.utc) - timedelta(hours=1)
td = self.task_data
if not td:
return one_hour_ago
timestamp = value_in_path(td, ["timing", td.get("step", ""), "started_at"])

if timestamp is None:
return one_hour_ago

return datetime.fromtimestamp(timestamp, timezone.utc)

last_run_started_at = get_last_run()

def from_legacy_config() -> List[DigitalOceanTeamCredentials]:
tokens: List[str] = Config.digitalocean.api_tokens
spaces_access_keys: List[str] = Config.digitalocean.spaces_access_keys
Expand Down Expand Up @@ -85,19 +101,19 @@ def key_to_tuple(key: str) -> Tuple[str, str]:
c.spaces_keys.access_key if c.spaces_keys else None,
c.spaces_keys.secret_key if c.spaces_keys else None,
)
team_graph = self.collect_team(client, self.core_feedback.with_context("digitalocean"))
team_graph = self.collect_team(client, self.core_feedback.with_context("digitalocean"), last_run_started_at)
if team_graph:
self.send_account_graph(team_graph)

def collect_team(self, client: StreamingWrapper, feedback: CoreFeedback) -> Optional[Graph]:
def collect_team(self, client: StreamingWrapper, feedback: CoreFeedback, last_run: datetime) -> Optional[Graph]:
"""Collects an individual team."""
team_id = client.get_team_id()
team = DigitalOceanTeam(id=team_id, tags={}, urn=f"do:team:{team_id}")

try:
feedback.progress_done(team_id, 0, 1)
team_feedback = feedback.with_context("digitalocean", client.get_team_id())
dopc = DigitalOceanTeamCollector(team, client.with_feedback(team_feedback))
dopc = DigitalOceanTeamCollector(team, client.with_feedback(team_feedback), last_run)
dopc.collect()
feedback.progress_done(team_id, 1, 1)
except Exception:
Expand Down
22 changes: 18 additions & 4 deletions plugins/digitalocean/resoto_plugin_digitalocean/client.py
@@ -1,13 +1,14 @@
import logging
from attrs import define
from functools import lru_cache
from typing import List, Any, Optional, Union, TypeVar, Callable
from typing import List, Any, Optional, Union, TypeVar, Callable, Mapping

import boto3
import requests
from botocore.exceptions import EndpointConnectionError, HTTPClientError
from retrying import retry as retry_decorator
from urllib.parse import urljoin
from urllib.parse import urljoin, urlencode
from datetime import datetime

from resoto_plugin_digitalocean.utils import RetryableHttpError
from resoto_plugin_digitalocean.utils import retry_on_error
Expand Down Expand Up @@ -83,10 +84,15 @@ def check_status_code(self, response: requests.Response) -> bool:
return False

@retry
def _fetch(self, path: str, payload_object_name: str) -> List[Json]:
def _fetch(self, path: str, payload_object_name: str, query: Optional[Mapping[str, str]] = None) -> List[Json]:
result: List[Json] = []

url = f"{self.do_api_endpoint}{path}?page=1&per_page=200"
url = f"{self.do_api_endpoint}{path}"
params = {"page": "1", "per_page": "200"}
params.update(query or {})

url = f"{url}?{urlencode(params)}"

log.debug(f"fetching {url}")

def validate_status(response: requests.Response) -> requests.Response:
Expand Down Expand Up @@ -349,6 +355,14 @@ def list_firewalls(self) -> List[Json]:
def list_alert_policies(self) -> List[Json]:
return self._fetch("/monitoring/alerts", "policies")

def get_droplet_cpu_usage(self, droplet_id: str, start: datetime, end: datetime) -> List[Json]:
query_params = {"host_id": droplet_id, "start": str(start.timestamp()), "end": str(end.timestamp())}
return self._fetch("/monitoring/metrics/droplet/cpu", "data", query_params)

def get_droplet_memory_available(self, droplet_id: str, start: datetime, end: datetime) -> List[Json]:
query_params = {"host_id": droplet_id, "start": str(start.timestamp()), "end": str(end.timestamp())}
return self._fetch("/monitoring/metrics/droplet/memory_available", "data", query_params)


TeamId = str

Expand Down
131 changes: 122 additions & 9 deletions plugins/digitalocean/resoto_plugin_digitalocean/collector.py
@@ -1,13 +1,18 @@
import logging
import math
from pprint import pformat
from typing import Tuple, Type, List, Dict, Callable, Any, Optional, cast
from typing import Tuple, Type, List, Dict, Callable, Any, Optional, cast, DefaultDict
from datetime import datetime, timedelta, timezone
from concurrent import futures
from collections import defaultdict
import statistics

from prometheus_client import Summary

from resotolib.baseresources import BaseResource, EdgeType, InstanceStatus, VolumeStatus
from resotolib.graph import Graph
from resotolib.types import Json
from resotolib.utils import utc
from hashlib import sha256
from .client import StreamingWrapper
from .resources import (
Expand Down Expand Up @@ -171,9 +176,10 @@ class DigitalOceanTeamCollector:
all DigitalOcean resources
"""

def __init__(self, team: DigitalOceanTeam, client: StreamingWrapper) -> None:
def __init__(self, team: DigitalOceanTeam, client: StreamingWrapper, last_run_started_at: datetime) -> None:
self.client = client
self.team = team
self.last_run_started_at = last_run_started_at

# Mandatory collectors are always collected regardless of whether
# they were included by --do-collect or excluded by --do-no-collect
Expand Down Expand Up @@ -424,7 +430,7 @@ def add_edge(search_map_key: str, is_parent: bool) -> None:

@metrics_collect_droplets.time()
def collect_droplets(self) -> None:
instances = self.client.list_droplets()
droplets = self.client.list_droplets()

def get_image(droplet: Json) -> Json:
image = droplet["image"]
Expand All @@ -440,7 +446,113 @@ def remove_duplicates(resources: List[Json], id_field: str) -> List[Json]:
seen_ids.add(resource[id_field])
return unique

images = [get_image(instance) for instance in instances]
droplet_ids: List[int] = [droplet["id"] for droplet in droplets]
start_time = self.last_run_started_at
end_time = utc()
if end_time - start_time < timedelta(minutes=4):
start_time = end_time - timedelta(minutes=4)

cpu_usage_metrics_results: Dict[int, Json] = {}
memory_usage_metrics_results: Dict[int, Json] = {}

with futures.ThreadPoolExecutor(max_workers=10) as executor:
cpu_future_to_droplet_id = {
executor.submit(
self.client.get_droplet_cpu_usage,
str(d_id),
start_time,
end_time, # todo: make this configurable
): d_id
for d_id in droplet_ids
}
memory_future_to_droplet_id = {
executor.submit(
self.client.get_droplet_memory_available,
str(d_id),
start_time,
end_time, # todo: make this configurable
): d_id
for d_id in droplet_ids
}

for f in futures.as_completed(cpu_future_to_droplet_id):
d_id = cpu_future_to_droplet_id[f]
results = f.result()
if results:
cpu_usage_metrics_results[d_id] = results[0]
for f in futures.as_completed(memory_future_to_droplet_id):
d_id = memory_future_to_droplet_id[f]
results = f.result()
if results:
memory_usage_metrics_results[d_id] = results[0]

def get_cpu_utilization(metric_json: Json, num_cpu_cores: float) -> Dict[str, float]:
result: DefaultDict[datetime, Dict[str, float]] = defaultdict(dict)

for item in metric_json["result"]:
mode = item["metric"]["mode"]
if mode != "idle":
continue

for values in item["values"]:
time = datetime.fromtimestamp(values[0], tz=timezone.utc)
value = float(values[1])
result[time][mode] = value

delta = 0.0
previous_data: Dict[str, float] = {}
previous_time = None

utilization_data: List[float] = []

for timestamp, data in result.items():
delta_total = 0.0
for mode, value in data.items():
delta = value - previous_data.get(mode, value)
delta_total += delta
if previous_time is not None:
interval_seconds = (timestamp - previous_time).total_seconds()
utilization = (1 - delta / (interval_seconds * num_cpu_cores)) * 100
utilization_data.append(utilization)
previous_data = data
previous_time = timestamp

return {
"min": round(min(utilization_data), 3),
"avg": round(statistics.mean(utilization_data), 3),
"max": round(max(utilization_data), 3),
}

def get_memory_utilization(metric_json: Json, total_mbytes: int) -> Dict[str, float]:
total_bytes = total_mbytes * 1024 * 1024
mem_utilization: List[float] = []

for item in metric_json["result"]:
for values in item["values"]:
value = int(values[1])
utilizaton = (value / total_bytes) * 100
mem_utilization.append(utilizaton)

return {
"min": round(min(mem_utilization), 3),
"avg": round(statistics.mean(mem_utilization), 3),
"max": round(max(mem_utilization), 3),
}

resource_usage: Dict[int, Dict[str, Dict[str, float]]] = {}

for droplet in droplets:
try:
cpu_utilizaton = get_cpu_utilization(cpu_usage_metrics_results[droplet["id"]], droplet["vcpus"])
mem_utilization = get_memory_utilization(memory_usage_metrics_results[droplet["id"]], droplet["memory"])
resource_usage[droplet["id"]] = {
"cpu_utilization": cpu_utilizaton,
"memory_utilization": mem_utilization,
}
except Exception as e:
log.warning("Failed to get utilization metrics for droplet %s: %s", droplet["id"], e)

images = [get_image(instance) for instance in droplets]
images = remove_duplicates(images, "id")

self.collect_resource(
Expand Down Expand Up @@ -469,7 +581,7 @@ def get_size(droplet: Json) -> Json:
size["region"] = droplet["region"]["slug"]
return cast(Json, size)

sizes = [get_size(instance) for instance in instances]
sizes = [get_size(instance) for instance in droplets]
sizes = remove_duplicates(sizes, "slug")

self.collect_resource(
Expand All @@ -495,7 +607,7 @@ def get_size(droplet: Json) -> Json:
"archive": InstanceStatus.TERMINATED,
}
self.collect_resource(
instances,
droplets,
resource_class=DigitalOceanDroplet,
attr_map={
"id": lambda i: str(i["id"]),
Expand All @@ -509,6 +621,7 @@ def get_size(droplet: Json) -> Json:
"droplet_features": "features",
"droplet_image": lambda d: d["image"]["slug"],
"tags": lambda instance: parse_tags(instance.get("tags", []) or []),
"_resource_usage": lambda d: resource_usage.get(d["id"], {}),
},
search_map={
"_region": [
Expand All @@ -531,16 +644,16 @@ def get_size(droplet: Json) -> Json:
m = sha256()
neighbors = sorted(neighbors)
m.update(DigitalOceanDropletNeighborhood.kind.encode())
for droplet in neighbors or []:
m.update(droplet.encode())
for n in neighbors or []:
m.update(n.encode())
id = m.hexdigest()[0:16]
neighbors_json.append(
{
"droplets": neighbors,
"id": id,
}
)
instances_to_region = {str(droplet["id"]): region_id(droplet["region"]["slug"]) for droplet in instances}
instances_to_region = {str(droplet["id"]): region_id(droplet["region"]["slug"]) for droplet in droplets}
self.collect_resource(
neighbors_json,
resource_class=DigitalOceanDropletNeighborhood,
Expand Down
1 change: 1 addition & 0 deletions plugins/digitalocean/resoto_plugin_digitalocean/config.py
Expand Up @@ -42,6 +42,7 @@ class DigitalOceanCollectorConfig:
metadata={
"description": (
"DigitalOcean credentials for the teams to be collected. "
"Expected format: [{ 'api_token': 'foo', 'spaces_keys': {'access_key': 'bar', 'secret_key': 'baz'}}]. "
"If provided, api_tokens and spaces_access_keys will be ignored"
)
},
Expand Down
2 changes: 2 additions & 0 deletions plugins/digitalocean/test/fixtures/__init__.py
Expand Up @@ -26,3 +26,5 @@
from .domain_records import domain_records as domain_records
from .firewalls import firewalls as firewalls
from .alerts import alerts as alerts
from .cpu_metrics import cpu_metrics as cpu_metrics
from .memory_available import memory_available as memory_available
39 changes: 39 additions & 0 deletions plugins/digitalocean/test/fixtures/cpu_metrics.py
@@ -0,0 +1,39 @@
cpu_metrics = [
{
"resultType": "matrix",
"result": [
{
"metric": {"host_id": "289110074", "mode": "idle"},
"values": [[1635386880, "122901.18"], [1635387000, "123020.92"], [1635387120, "123140.8"]],
},
{
"metric": {"host_id": "289110074", "mode": "iowait"},
"values": [[1635386880, "14.99"], [1635387000, "15.01"], [1635387120, "15.01"]],
},
{
"metric": {"host_id": "289110074", "mode": "irq"},
"values": [[1635386880, "0"], [1635387000, "0"], [1635387120, "0"]],
},
{
"metric": {"host_id": "289110074", "mode": "nice"},
"values": [[1635386880, "66.35"], [1635387000, "66.35"], [1635387120, "66.35"]],
},
{
"metric": {"host_id": "289110074", "mode": "softirq"},
"values": [[1635386880, "2.13"], [1635387000, "2.13"], [1635387120, "2.13"]],
},
{
"metric": {"host_id": "289110074", "mode": "steal"},
"values": [[1635386880, "7.89"], [1635387000, "7.9"], [1635387120, "7.91"]],
},
{
"metric": {"host_id": "289110074", "mode": "system"},
"values": [[1635386880, "140.09"], [1635387000, "140.2"], [1635387120, "140.23"]],
},
{
"metric": {"host_id": "289110074", "mode": "user"},
"values": [[1635386880, "278.57"], [1635387000, "278.65"], [1635387120, "278.69"]],
},
],
}
]
4 changes: 2 additions & 2 deletions plugins/digitalocean/test/fixtures/droplets.py
Expand Up @@ -2,7 +2,7 @@
{
"id": 289110074,
"name": "ubuntu-s-1vcpu-1gb-fra1-01",
"memory": 1024,
"memory": 8096,
"vcpus": 1,
"disk": 25,
"locked": False,
Expand Down Expand Up @@ -180,7 +180,7 @@
{
"id": 290075243,
"name": "pool-1g2g56zow-u9fs4",
"memory": 2048,
"memory": 8096,
"vcpus": 1,
"disk": 50,
"locked": False,
Expand Down

0 comments on commit 883ef21

Please sign in to comment.