Skip to content

Commit

Permalink
[feat] Add resource base data to the benchmark result (#1769)
Browse files Browse the repository at this point in the history
  • Loading branch information
aquamatthias committed Sep 8, 2023
1 parent efcaa85 commit 5773b50
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 63 deletions.
1 change: 1 addition & 0 deletions resotocore/resotocore/model/resolve_in_graph.py
Expand Up @@ -25,6 +25,7 @@ class NodePath:
to_node = ["to"]
edge_type = ["edge_type"]
ancestor_account_name = ["ancestors", "account", "reported", "name"]
ancestor_account_id = ["ancestors", "account", "reported", "id"]


@define(frozen=True)
Expand Down
3 changes: 3 additions & 0 deletions resotocore/resotocore/report/__init__.py
Expand Up @@ -118,6 +118,7 @@ def to_node(self) -> Json:
class CheckResult:
check: ReportCheck
number_of_resources_failing_by_account: Dict[str, int]
resources_failing_by_account: Dict[str, List[Json]]
node_id: str = field(init=False, factory=uuid_str)

@property
Expand All @@ -134,6 +135,7 @@ def to_node(self) -> Json:
reported["number_of_resources_failing"] = self.number_of_resources_failing
if self.number_of_resources_failing_by_account:
reported["number_of_resources_failing_by_account"] = self.number_of_resources_failing_by_account
reported["resources_failing_by_account"] = self.resources_failing_by_account
return dict(id=self.node_id, kind="report_check_result", type="node", reported=reported)

@staticmethod
Expand All @@ -156,6 +158,7 @@ def from_node(js: Json) -> CheckResult:
related=reported.get("related", []),
),
number_of_resources_failing_by_account=reported.get("number_of_resources_failing_by_account", {}),
resources_failing_by_account=reported.get("resources_failing_by_account", {}),
)


Expand Down
113 changes: 50 additions & 63 deletions resotocore/resotocore/report/inspector_service.py
@@ -1,9 +1,10 @@
import asyncio
import logging
from collections import defaultdict
from typing import Optional, List, Dict, Tuple, Callable, AsyncIterator

from aiostream import stream
from attr import evolve, define
from attr import define

from resotocore.analytics import CoreEvent
from resotocore.cli.model import CLIContext, CLI
Expand All @@ -13,7 +14,7 @@
from resotocore.ids import ConfigId, GraphName
from resotocore.model.model import Model
from resotocore.model.resolve_in_graph import NodePath
from resotocore.query.model import Aggregate, AggregateFunction, Query, P, AggregateVariable, AggregateVariableName
from resotocore.query.model import Query, P
from resotocore.report import (
Inspector,
ReportCheck,
Expand All @@ -33,13 +34,14 @@
ReportSeverityPriority,
)
from resotocore.report.report_config import ReportCheckCollectionConfig, BenchmarkConfig
from resotocore.service import Service
from resotocore.types import Json
from resotocore.util import value_in_path
from resotocore.service import Service
from resotolib.json_bender import Bender, S, bend

log = logging.getLogger(__name__)

CountByAccount = Dict[str, int]
SingleCheckResult = Dict[str, List[Json]]


def benchmark_id(name: str) -> ConfigId:
Expand All @@ -64,6 +66,23 @@ def includes_severity(self, severity: ReportSeverity) -> bool:
return ReportSeverityPriority[self.severity] <= ReportSeverityPriority[severity]


# This defines the subset of the data provided for every resource
ReportResourceData: Dict[str, Bender] = {
"node_id": S("id"),
"id": S("reported", "id"),
"name": S("reported", "name"),
"kind": S("reported", "kind"),
"tags": S("reported", "tags"),
"ctime": S("reported", "ctime"),
"atime": S("reported", "atime"),
"mtime": S("reported", "mtime"),
"cloud": S("ancestors", "cloud", "reported", "name"),
"account": S("ancestors", "account", "reported", "name"),
"region": S("ancestors", "region", "reported", "name"),
"zone": S("ancestors", "zone", "reported", "name"),
}


class InspectorService(Inspector, Service):
def __init__(self, cli: CLI) -> None:
super().__init__()
Expand Down Expand Up @@ -191,23 +210,24 @@ async def filter_checks(self, report_filter: Optional[Callable[[ReportCheck], bo
async def list_failing_resources(
self, graph: GraphName, check_uid: str, account_ids: Optional[List[str]] = None
) -> AsyncIterator[Json]:
# create context
context = CheckContext(accounts=account_ids)
return await self.__list_failing_resources(graph, check_uid, context)

async def __list_failing_resources(
self, graph: GraphName, check_uid: str, context: CheckContext
) -> AsyncIterator[Json]:
# get check
checks = await self.list_checks(check_ids=[check_uid])
if not checks:
raise NotFoundError(f"Check {check_uid} not found")
inspection = checks[0]
# load model
model = await self.model_handler.load_model(graph)
inspection = checks[0]
# load configuration
cfg_entity = await self.config_handler.get_config(ResotoReportValues)
cfg = cfg_entity.config if cfg_entity else {}
return await self.__list_failing_resources(graph, model, inspection, cfg, context)

async def __list_failing_resources(
self, graph: GraphName, model: Model, inspection: ReportCheck, config: Json, context: CheckContext
) -> AsyncIterator[Json]:
# final environment: defaults are coming from the check and are eventually overriden in the config
env = inspection.environment(cfg)
env = inspection.environment(config)
account_id_prop = "ancestors.account.reported.id"
# if the result kind is an account, we need to use the id directly instead of walking the graph
if (result_kind := model.get(inspection.result_kind)) and "account" in result_kind.kind_hierarchy():
Expand Down Expand Up @@ -247,13 +267,16 @@ async def __perform_benchmark(

perform_checks = await self.list_checks(check_ids=benchmark.nested_checks())
check_by_id = {c.id: c for c in perform_checks if context.includes_severity(c.severity)}
result = await self.__perform_checks(graph, perform_checks, context)
results = await self.__perform_checks(graph, perform_checks, context)
await self.event_sender.core_event(CoreEvent.BenchmarkPerformed, {"benchmark": benchmark.id})

def to_result(cc: CheckCollection) -> CheckCollectionResult:
check_results = [
CheckResult(check_by_id[cid], result.get(cid, {})) for cid in cc.checks or [] if cid in check_by_id
]
check_results = []
for cid in cc.checks or []:
if (check := check_by_id.get(cid)) is not None:
result = results.get(cid, {})
count_by_account = {uid: len(failed) for uid, failed in result.items()}
check_results.append(CheckResult(check, count_by_account, result))
children = [to_result(c) for c in cc.children or []]
return CheckCollectionResult(
cc.title, cc.description, documentation=cc.documentation, checks=check_results, children=children
Expand All @@ -275,14 +298,14 @@ def to_result(cc: CheckCollection) -> CheckCollectionResult:

async def __perform_checks(
self, graph: GraphName, checks: List[ReportCheck], context: CheckContext
) -> Dict[str, CountByAccount]:
) -> Dict[str, SingleCheckResult]:
# load model
model = await self.model_handler.load_model(graph)
# load configuration
cfg_entity = await self.config_handler.get_config(ResotoReportValues)
cfg = cfg_entity.config if cfg_entity else {}

async def perform_single(check: ReportCheck) -> Tuple[str, CountByAccount]:
async def perform_single(check: ReportCheck) -> Tuple[str, SingleCheckResult]:
return check.id, await self.__perform_check(graph, model, check, cfg, context)

async with stream.map(
Expand All @@ -292,52 +315,16 @@ async def perform_single(check: ReportCheck) -> Tuple[str, CountByAccount]:

async def __perform_check(
self, graph: GraphName, model: Model, inspection: ReportCheck, config: Json, context: CheckContext
) -> CountByAccount:
# final environment: defaults are coming from the check and are eventually overriden in the config
env = inspection.environment(config)
account_id_prop = "ancestors.account.reported.id"
) -> SingleCheckResult:
resources_by_account = defaultdict(list)
# if the result kind is an account, we need to use the id directly instead of walking the graph
if (result_kind := model.get(inspection.result_kind)) and "account" in result_kind.kind_hierarchy():
account_id_prop = "reported.id"

async def perform_search(search: str) -> CountByAccount:
# parse query
query = await self.template_expander.parse_query(search, on_section="reported", **env)
# filter only relevant accounts if provided
if context.accounts:
query = Query.by(P.single(account_id_prop).is_in(context.accounts)).combine(query)
# add aggregation to only query for count
ag_var = AggregateVariable(AggregateVariableName(account_id_prop), "account_id")
ag_fn = AggregateFunction("sum", 1, [], "count")
query = evolve(query, aggregate=Aggregate([ag_var], [ag_fn]))
account_result: CountByAccount = {}
async with await self.db_access.get_graph_db(graph).search_aggregation(QueryModel(query, model)) as ctx:
async for result in ctx:
account_result[result["group"]["account_id"]] = result["count"] or 0
return account_result

async def perform_cmd(cmd: str) -> CountByAccount:
# filter only relevant accounts if provided
if context.accounts:
account_list = ",".join(f'"{a}"' for a in context.accounts)
cmd = f"search /{account_id_prop} in [{account_list}] | " + cmd
# aggregate by account
aggregate = f"aggregate /{account_id_prop} as account_id: sum(1) as count"
cli_result = await self.cli.execute_cli_command(f"{cmd} | {aggregate}", stream.list, CLIContext(env=env))
account_result: CountByAccount = {}
for result in cli_result[0]:
account_result[result["group"]["account_id"]] = result["count"] or 0
return account_result

if resoto_search := inspection.detect.get("resoto"):
return await perform_search(resoto_search)
elif resoto_cmd := inspection.detect.get("resoto_cmd"):
return await perform_cmd(resoto_cmd)
elif inspection.detect.get("manual"):
# let's assume the manual check is successful
return {}
else:
raise ValueError(f"Invalid inspection {inspection.id}: no resoto or resoto_cmd defined")
is_account = (rk := model.get(inspection.result_kind)) and "account" in rk.kind_hierarchy()
account_id_path = NodePath.reported_id if is_account else NodePath.ancestor_account_id
async for resource in await self.__list_failing_resources(graph, model, inspection, config, context):
account_id = value_in_path(resource, account_id_path)
if account_id:
resources_by_account[account_id].append(bend(ReportResourceData, resource))
return resources_by_account

async def __list_accounts(self, benchmark: Benchmark, graph: GraphName) -> List[str]:
model = await self.model_handler.load_model(graph)
Expand Down
1 change: 1 addition & 0 deletions resotocore/resotocore/web/directives.py
Expand Up @@ -79,6 +79,7 @@ async def metrics_handler(request: Request, handler: RequestHandler) -> StreamRe
raise ex
finally:
resp_time = perf_now() - request["start_time"]
log.debug(f"Request {request} took {resp_time} ms")
RequestLatency.labels(request.path).observe(resp_time)
RequestInProgress.labels(request.path, request.method).dec()

Expand Down
2 changes: 2 additions & 0 deletions resotocore/tests/resotocore/report/inspector_service_test.py
Expand Up @@ -108,7 +108,9 @@ async def test_perform_benchmark(inspector_service_with_test_benchmark: Inspecto
assert result.checks[1].number_of_resources_failing == 10
filtered = result.filter_result(filter_failed=True)
assert filtered.checks[0].number_of_resources_failing == 10
assert len(filtered.checks[0].resources_failing_by_account["sub_root"]) == 10
assert filtered.checks[1].number_of_resources_failing == 10
assert len(filtered.checks[1].resources_failing_by_account["sub_root"]) == 10
passing, failing = result.passing_failing_checks_for_account("sub_root")
assert len(passing) == 0
assert len(failing) == 2
Expand Down

0 comments on commit 5773b50

Please sign in to comment.