From b8c516cd8ed014d7bdd9637913dbfd8adcd5e8b1 Mon Sep 17 00:00:00 2001 From: Matthias Veit Date: Fri, 3 Nov 2023 16:36:06 +0100 Subject: [PATCH] [resotocore][feat] Provide possible values for attributes and values (#1814) --- resotocore/resotocore/db/arango_query.py | 78 ++++++++++- resotocore/resotocore/db/graphdb.py | 48 ++++++- resotocore/resotocore/static/api-doc.yaml | 123 ++++++++++++++++++ resotocore/resotocore/web/api.py | 28 +++- .../tests/resotocore/db/arango_query_test.py | 87 ++++++++++++- .../tests/resotocore/db/graphdb_test.py | 21 ++- 6 files changed, 376 insertions(+), 9 deletions(-) diff --git a/resotocore/resotocore/db/arango_query.py b/resotocore/resotocore/db/arango_query.py index 14e4f1c66..8eac14591 100644 --- a/resotocore/resotocore/db/arango_query.py +++ b/resotocore/resotocore/db/arango_query.py @@ -3,7 +3,7 @@ import re from collections import defaultdict from textwrap import dedent -from typing import Union, List, Tuple, Any, Optional, Dict, Set +from typing import Union, List, Tuple, Any, Optional, Dict, Set, Literal from arango.typings import Json from attrs import evolve @@ -751,6 +751,82 @@ def ft_term(cursor: str, ab_term: Term) -> str: return resulting_cursor, query_str +def possible_values( + db: Any, + query: QueryModel, + path_or_predicate: Union[str, Predicate], + detail: Literal["attributes", "values"], + limit: Optional[int] = None, + skip: Optional[int] = None, +) -> Tuple[str, Json]: + path = path_or_predicate if isinstance(path_or_predicate, str) else path_or_predicate.name + counters: Dict[str, int] = defaultdict(lambda: 0) + + def next_counter(name: str) -> int: + count = counters[name] + counters[name] = count + 1 + return count + + def next_crs(name: str = "m") -> str: + return f"{name}{next_counter(name)}" + + bind_vars: Json = {} + start = f"`{db.vertex_name}`" + cursor, query_str = query_string(db, query.query, query, start, False, bind_vars, counters, id_column="_key") + + # iterate over the result + let_cursor = next_crs() + query_str += f" LET {let_cursor} = (" + next_cursor = next_crs() + query_str += f" FOR {next_cursor} in {cursor}" + cursor = next_cursor + + # expand array paths + ars = [a.lstrip(".") for a in array_marker_in_path_regexp.split(path)] + prop_name = None if path.endswith("[]") or path.endswith("[*]") else ars.pop() + for ar in ars: + nxt_crs = next_crs() + query_str += f" FOR {nxt_crs} IN TO_ARRAY({cursor}.{ar})" + cursor = nxt_crs + access_path = f"{cursor}.{prop_name}" if prop_name is not None else cursor + + # access the detail + if detail == "attributes": + cursor = next_crs() + query_str += ( + f" FILTER IS_OBJECT({access_path}) FOR {cursor} IN ATTRIBUTES({access_path}, true) RETURN {cursor})" + ) + elif detail == "values": + query_str += f" RETURN {access_path})" + else: + raise AttributeError(f"Unknown detail: {detail}") + + # result stream of matching entries: filter and sort + sorted_let = next_crs() + next_cursor = next_crs() + query_str += f" LET {sorted_let} = (FOR {next_cursor} IN {let_cursor} FILTER {next_cursor}!=null" + cursor = next_cursor + if isinstance(path_or_predicate, Predicate): + p: Predicate = path_or_predicate + bvn = f'b{next_counter("bind_vars")}' + prop = query.model.property_by_path(Section.without_section(p.name)) + pk = prop.kind + op = lgt_ops[p.op] if prop.simple_kind.reverse_order and p.op in lgt_ops else p.op + bind_vars[bvn] = [pk.coerce(a) for a in p.value] if isinstance(p.value, list) else pk.coerce(p.value) + if op == "=~": # use regex_test to do case-insensitive matching + query_str += f" FILTER REGEX_TEST({cursor}, @{bvn}, true)" + else: + query_str += f" FILTER {cursor} {op} @{bvn}" + query_str += f" RETURN DISTINCT {cursor})" + cursor = sorted_let + next_cursor = next_crs() + query_str += f"FOR {next_cursor} IN {cursor} SORT {next_cursor} ASC" + if limit: + query_str += f" LIMIT {skip if skip else 0}, {limit}" + query_str += f" RETURN {next_cursor}" + return query_str, bind_vars + + async def query_cost(graph_db: Any, model: QueryModel, with_edges: bool) -> EstimatedSearchCost: q_string, bind = to_query(graph_db, model, with_edges=with_edges) nr_nodes = await graph_db.db.count(graph_db.vertex_name) diff --git a/resotocore/resotocore/db/graphdb.py b/resotocore/resotocore/db/graphdb.py index 54f0c67e9..4a0b55eef 100644 --- a/resotocore/resotocore/db/graphdb.py +++ b/resotocore/resotocore/db/graphdb.py @@ -20,6 +20,8 @@ TypeVar, cast, AsyncIterator, + Literal, + Union, ) from aiostream import stream @@ -51,7 +53,7 @@ synthetic_metadata_kinds, ) from resotocore.model.resolve_in_graph import NodePath, GraphResolver -from resotocore.query.model import Query, FulltextTerm, MergeTerm, P +from resotocore.query.model import Query, FulltextTerm, MergeTerm, P, Predicate from resotocore.report import ReportSeverity from resotocore.types import JsonElement, EdgeType from resotocore.util import first, value_in_path_get, utc_str, uuid_str, value_in_path, json_hash, set_value_in_path @@ -160,6 +162,19 @@ async def search_history( ) -> AsyncCursorContext: pass + @abstractmethod + async def list_possible_values( + self, + query: QueryModel, + path_or_predicate: Union[str, Predicate], + part: Literal["attributes", "values"], + limit: Optional[int] = None, + skip: Optional[int] = None, + with_count: bool = False, + timeout: Optional[timedelta] = None, + ) -> AsyncCursorContext: + pass + @abstractmethod async def search_graph_gen( self, query: QueryModel, with_count: bool = False, timeout: Optional[timedelta] = None @@ -528,6 +543,25 @@ async def by_id_with(self, db: AsyncArangoDBBase, node_id: NodeId) -> Optional[J with await db.aql(query=self.query_node_by_id(), bind_vars={"rid": node_id}) as cursor: return cursor.next() if not cursor.empty() else None + async def list_possible_values( + self, + query: QueryModel, + path_or_predicate: Union[str, Predicate], + part: Literal["attributes", "values"], + limit: Optional[int] = None, + skip: Optional[int] = None, + with_count: bool = False, + timeout: Optional[timedelta] = None, + ) -> AsyncCursorContext: + q_string, bind = arango_query.possible_values(self, query, path_or_predicate, part, limit, skip) + return await self.db.aql_cursor( + query=q_string, + count=with_count, + bind_vars=bind, + batch_size=10000, + ttl=cast(Number, int(timeout.total_seconds())) if timeout else None, + ) + async def search_list( self, query: QueryModel, with_count: bool = False, timeout: Optional[timedelta] = None, **kwargs: Any ) -> AsyncCursorContext: @@ -1496,6 +1530,18 @@ async def abort_update(self, batch_id: str) -> None: await self.real.abort_update(batch_id) await self.event_sender.core_event(CoreEvent.BatchUpdateAborted, {"graph": self.graph_name, "batch": info}) + async def list_possible_values( + self, + query: QueryModel, + path_or_predicate: Union[str, Predicate], + part: Literal["attributes", "values"], + limit: Optional[int] = None, + skip: Optional[int] = None, + with_count: bool = False, + timeout: Optional[timedelta] = None, + ) -> AsyncCursorContext: + return await self.real.list_possible_values(query, path_or_predicate, part, limit, skip, with_count, timeout) + async def search_list( self, query: QueryModel, with_count: bool = False, timeout: Optional[timedelta] = None, **kwargs: Any ) -> AsyncCursorContext: diff --git a/resotocore/resotocore/static/api-doc.yaml b/resotocore/resotocore/static/api-doc.yaml index 7735331a5..23bd45080 100644 --- a/resotocore/resotocore/static/api-doc.yaml +++ b/resotocore/resotocore/static/api-doc.yaml @@ -1806,6 +1806,129 @@ paths: application/x-ndjson: schema: $ref: "#/components/schemas/Aggregated" + /graph/{graph_id}/property/attributes: + post: + summary: "Search the graph and return all possible attribute names for given property path." + tags: + - graph_search + parameters: + - name: graph_id + in: path + example: resoto + description: "The identifier of the graph" + required: true + schema: + type: string + - name: prop + in: query + example: | + tags + description: "The property path to search for with an optional predicate" + required: true + schema: + type: string + - name: section + in: query + description: "The name of the section used for all property paths. If not defined root is assumed." + required: false + schema: + type: string + enum: + - reported + - desired + - metadata + - name: count + in: query + description: "Optional parameter to get a Ck-Element-Count header which returns the number of returned json elements" + required: false + schema: + type: boolean + default: true + requestBody: + description: "The search to perform" + content: + text/plain: + schema: + type: string + example: is(graph_root) and reported.name=="root" --> + responses: + "200": + description: "The result of this search in the defined format" + content: + "application/json": + schema: + type: array + example: | + [ + "owner", + "checksum/secret", + "prometheus.io/path", + ] + items: + type: string + /graph/{graph_id}/property/values: + post: + summary: "Search the graph and return all possible attribute values for given property path." + tags: + - graph_search + parameters: + - name: graph_id + in: path + example: resoto + description: "The identifier of the graph" + required: true + schema: + type: string + - name: prop + in: query + example: | + tags + description: "The property path to search for with an optional predicate" + required: true + schema: + type: string + - name: section + in: query + description: "The name of the section used for all property paths. If not defined root is assumed." + required: false + schema: + type: string + enum: + - reported + - desired + - metadata + - name: count + in: query + description: "Optional parameter to get a Ck-Element-Count header which returns the number of returned json elements" + required: false + schema: + type: boolean + default: true + requestBody: + description: "The search to perform" + content: + text/plain: + schema: + type: string + example: is(graph_root) and reported.name=="root" --> + responses: + "200": + description: "The result of this search in the defined format" + content: + "application/json": + schema: + type: array + example: | + [ + "owner", + "checksum/secret", + "prometheus.io/path", + ] + items: + type: string + + + # endregion # region events diff --git a/resotocore/resotocore/web/api.py b/resotocore/resotocore/web/api.py index c97c55c0a..35c9b3fd8 100644 --- a/resotocore/resotocore/web/api.py +++ b/resotocore/resotocore/web/api.py @@ -27,6 +27,7 @@ Callable, Awaitable, Iterable, + Literal, ) from urllib.parse import urlencode, urlparse, parse_qs, urlunparse @@ -58,7 +59,6 @@ from resotocore.analytics import AnalyticsEvent from resotocore.cli.command import alias_names -from resotocore.dependencies import Dependencies, TenantDependencies from resotocore.cli.model import ( ParsedCommandLine, CLIContext, @@ -73,6 +73,8 @@ from resotocore.console_renderer import ConsoleColorSystem, ConsoleRenderer from resotocore.db.graphdb import GraphDB, HistoryChange from resotocore.db.model import QueryModel +from resotocore.dependencies import Dependencies, TenantDependencies +from resotocore.dependencies import TenantDependencyProvider from resotocore.error import NotFoundError, NotEnoughPermissions from resotocore.ids import ( TaskId, @@ -91,6 +93,8 @@ from resotocore.model.json_schema import json_schema from resotocore.model.model import Kind, Model from resotocore.model.typed_model import to_json, from_js, to_js_str, to_js +from resotocore.query.model import Predicate, PathRoot, variable_to_absolute +from resotocore.query.query_parser import predicate_term from resotocore.report import Benchmark, ReportCheck from resotocore.service import Service from resotocore.task.model import Subscription @@ -99,7 +103,6 @@ from resotocore.util import uuid_str, force_gen, rnd_str, if_set, duration, utc_str, parse_utc, async_noop, utc from resotocore.web.auth import raw_jwt_from_auth_message, LoginWithCode, AuthHandler from resotocore.web.content_renderer import result_binary_gen, single_result -from resotocore.dependencies import TenantDependencyProvider from resotocore.web.directives import ( metrics_handler, error_handler, @@ -226,6 +229,8 @@ def __add_routes(self, prefix: str) -> None: web.post(prefix + "/graph/{graph_id}/search/aggregate", require(self.query_aggregation, r)), web.post(prefix + "/graph/{graph_id}/search/history/list", require(self.query_history, r)), web.post(prefix + "/graph/{graph_id}/search/history/aggregate", require(self.query_history, r)), + web.post(prefix + "/graph/{graph_id}/property/attributes", require(self.possible_values, r)), + web.post(prefix + "/graph/{graph_id}/property/values", require(self.possible_values, r)), # maintain the graph web.patch(prefix + "/graph/{graph_id}/nodes", require(self.update_nodes, a)), web.post(prefix + "/graph/{graph_id}/merge", require(self.merge_graph, a)), @@ -1016,6 +1021,25 @@ async def explain(self, request: Request, deps: TenantDependencies) -> StreamRes result = await graph_db.explain(query_model) return web.json_response(to_js(result)) + async def possible_values(self, request: Request, deps: TenantDependencies) -> StreamResponse: + graph_db, query_model = await self.graph_query_model_from_request(request, deps) + section = section_of(request) + detail: Literal["attributes", "values"] = "attributes" if request.path.endswith("attributes") else "values" + root_or_section = None if section is None or section == PathRoot else section + fn = partial(variable_to_absolute, root_or_section) + prop = request.query["prop"] # fail if not provided + limit = if_set(request.query.get("limit"), int) + skip = if_set(request.query.get("skip"), int) + count = request.query.get("count", "true").lower() != "false" + try: + prop_or_predicate: Union[Predicate, str] = predicate_term.parse(prop).change_variable(fn) + except Exception: + prop_or_predicate = fn(prop) + async with await graph_db.list_possible_values( + query_model, prop_or_predicate, detail, limit, skip, count + ) as cursor: + return await self.stream_response_from_gen(request, cursor, cursor.count()) + async def query_structure(self, request: Request, deps: TenantDependencies) -> StreamResponse: _, query_model = await self.graph_query_model_from_request(request, deps) return web.json_response(query_model.query.structure()) diff --git a/resotocore/tests/resotocore/db/arango_query_test.py b/resotocore/tests/resotocore/db/arango_query_test.py index 598119293..36b24cb2b 100644 --- a/resotocore/tests/resotocore/db/arango_query_test.py +++ b/resotocore/tests/resotocore/db/arango_query_test.py @@ -1,12 +1,12 @@ import pytest from resotocore.db import EstimatedSearchCost, EstimatedQueryCostRating -from resotocore.db.arango_query import to_query, query_cost, fulltext_term_combine +from resotocore.db.arango_query import to_query, query_cost, fulltext_term_combine, possible_values from resotocore.db.graphdb import GraphDB from resotocore.db.model import QueryModel from resotocore.model.model import Model from resotocore.query.model import Query, Sort, P -from resotocore.query.query_parser import parse_query +from resotocore.query.query_parser import parse_query, predicate_term def test_sort_order_for_synthetic_prop(foo_model: Model, graph_db: GraphDB) -> None: @@ -196,3 +196,86 @@ def test_aggregation(foo_model: Model, graph_db: GraphDB) -> None: 'RETURN {"group":{"name": var_0, "c": var_1}, "max_of_num": fn_0, ' '"min_of_a_x": fn_1, "sum_of_a_b_d": fn_2}' in q ) + + +def test_possible_values(foo_model: Model, graph_db: GraphDB) -> None: + # attributes: simple path + model = QueryModel(parse_query("is(foo)"), foo_model) + pv, bv = possible_values(graph_db, model, "reported.tags", "attributes") + assert pv == ( + "LET filter0 = (FOR m0 in `ns` FILTER @b0 IN m0.kinds RETURN m0) " # query + "LET m2 = ( FOR m3 in filter0 FILTER IS_OBJECT(m3.reported.tags) " # make sure that the property is an object + "FOR m4 IN ATTRIBUTES(m3.reported.tags, true) RETURN m4) " # iterate over all properties of the object path + "LET m5 = (FOR m6 IN m2 FILTER m6!=null RETURN DISTINCT m6)" # filter null, distinct + "FOR m7 IN m5 SORT m7 ASC RETURN m7" + ) # sort and return + assert bv == {"b0": "foo"} + # attributes: predicate + tags_with_a = predicate_term.parse('reported.tags =~ "^a.*"') + pv, bv = possible_values(graph_db, model, tags_with_a, "attributes") + assert pv == ( + "LET filter0 = (FOR m0 in `ns` FILTER @b0 IN m0.kinds RETURN m0) " + "LET m2 = ( FOR m3 in filter0 FILTER IS_OBJECT(m3.reported.tags) " + "FOR m4 IN ATTRIBUTES(m3.reported.tags, true) RETURN m4) " + "LET m5 = (FOR m6 IN m2 FILTER m6!=null FILTER REGEX_TEST(m6, @b1, true) " # filter by null and regex + "RETURN DISTINCT m6)FOR m7 IN m5 SORT m7 ASC RETURN m7" + ) + + assert bv == {"b0": "foo", "b1": "^a.*"} + # attributes: predicate over array value + pred = predicate_term.parse("reported.pod_spec.containers[*].security_context.run_as_user not in [1000, 10001]") + pv, bv = possible_values(graph_db, model, pred, "attributes") + assert pv == ( + "LET filter0 = (FOR m0 in `ns` FILTER @b0 IN m0.kinds RETURN m0) " + "LET m2 = ( FOR m3 in filter0 FOR m4 IN TO_ARRAY(m3.reported.pod_spec.containers) " # expand nested arrays + "FILTER IS_OBJECT(m4.security_context.run_as_user) " + "FOR m5 IN ATTRIBUTES(m4.security_context.run_as_user, true) RETURN m5) " + "LET m6 = (FOR m7 IN m2 FILTER m7!=null FILTER m7 not in @b1 RETURN DISTINCT m7)" + "FOR m8 IN m6 SORT m8 ASC RETURN m8" + ) + assert bv == {"b0": "foo", "b1": [1000, 10001]} + # attributes: array as last element + pv, bv = possible_values(graph_db, model, "reported.pod_spec.containers[*]", "attributes") + assert pv == ( + "LET filter0 = (FOR m0 in `ns` FILTER @b0 IN m0.kinds RETURN m0) " + "LET m2 = ( FOR m3 in filter0 FOR m4 IN TO_ARRAY(m3.reported.pod_spec.containers[*]) " + "FILTER IS_OBJECT(m4) FOR m5 IN ATTRIBUTES(m4, true) RETURN m5) " + "LET m6 = (FOR m7 IN m2 FILTER m7!=null RETURN DISTINCT m7)" + "FOR m8 IN m6 SORT m8 ASC RETURN m8" + ) + # values: simple path + pv, bv = possible_values(graph_db, model, "reported.tags.test", "values") + assert pv == ( + "LET filter0 = (FOR m0 in `ns` FILTER @b0 IN m0.kinds RETURN m0) " # query + "LET m2 = ( FOR m3 in filter0 RETURN m3.reported.tags.test) " # select the property value + "LET m4 = (FOR m5 IN m2 FILTER m5!=null RETURN DISTINCT m5)" # filter null, distinct + "FOR m6 IN m4 SORT m6 ASC RETURN m6" + ) # sort and return + assert bv == {"b0": "foo"} + # values: predicate + pv, bv = possible_values(graph_db, model, tags_with_a, "values") + assert pv == ( + "LET filter0 = (FOR m0 in `ns` FILTER @b0 IN m0.kinds RETURN m0) " + "LET m2 = ( FOR m3 in filter0 RETURN m3.reported.tags) " + "LET m4 = (FOR m5 IN m2 FILTER m5!=null FILTER REGEX_TEST(m5, @b1, true) RETURN DISTINCT m5)" # filter by null and regex + "FOR m6 IN m4 SORT m6 ASC RETURN m6" + ) + assert bv == {"b0": "foo", "b1": "^a.*"} + # values: predicate over array value + pv, bv = possible_values(graph_db, model, pred, "attributes") + assert pv == ( + "LET filter0 = (FOR m0 in `ns` FILTER @b0 IN m0.kinds RETURN m0) " + "LET m2 = ( FOR m3 in filter0 FOR m4 IN TO_ARRAY(m3.reported.pod_spec.containers) " # expand nested arrays + "FILTER IS_OBJECT(m4.security_context.run_as_user) " + "FOR m5 IN ATTRIBUTES(m4.security_context.run_as_user, true) RETURN m5) " + "LET m6 = (FOR m7 IN m2 FILTER m7!=null FILTER m7 not in @b1 RETURN DISTINCT m7)" + "FOR m8 IN m6 SORT m8 ASC RETURN m8" + ) + assert bv == {"b0": "foo", "b1": [1000, 10001]} + # limit the result + pv, bv = possible_values(graph_db, model, "reported.tags", "values", limit=10) + print(pv) + assert pv.endswith("SORT m6 ASC LIMIT 0, 10 RETURN m6") + # skip and limit the result + pv, bv = possible_values(graph_db, model, "reported.tags", "values", limit=10, skip=20) + assert pv.endswith("SORT m6 ASC LIMIT 20, 10 RETURN m6") diff --git a/resotocore/tests/resotocore/db/graphdb_test.py b/resotocore/tests/resotocore/db/graphdb_test.py index bd8e0555e..e3a7cde07 100644 --- a/resotocore/tests/resotocore/db/graphdb_test.py +++ b/resotocore/tests/resotocore/db/graphdb_test.py @@ -3,7 +3,7 @@ from abc import ABC, abstractmethod from datetime import date, datetime, timedelta from random import SystemRandom -from typing import List, Optional, Any, Dict, cast, AsyncIterator, Tuple +from typing import List, Optional, Any, Dict, cast, AsyncIterator, Tuple, Union, Literal from arango.database import StandardDatabase from arango.typings import Json @@ -20,8 +20,8 @@ from resotocore.model.graph_access import GraphAccess, EdgeTypes, Section from resotocore.model.model import Model, UsageDatapoint from resotocore.model.typed_model import from_js, to_js -from resotocore.query.model import Query, P, Navigation -from resotocore.query.query_parser import parse_query +from resotocore.query.model import Query, P, Navigation, Predicate +from resotocore.query.query_parser import parse_query, predicate_term from resotocore.types import JsonElement, EdgeType from resotocore.util import AccessJson, utc, value_in_path, AccessNone from tests.resotocore.utils import eventually @@ -667,6 +667,21 @@ async def validate(original_db_name: GraphName, copy_db_name: str) -> None: await snapshot_db.wipe() +@mark.asyncio +async def test_list_possible_values(filled_graph_db: ArangoGraphDB, foo_model: Model) -> None: + async def pv(q: str, path_or_pred: Union[str, Predicate], detail: Literal["attributes", "values"]) -> List[Any]: + qm = QueryModel(parse_query(q), foo_model) + async with await filled_graph_db.list_possible_values(qm, path_or_pred, detail) as cursor: + return [a async for a in cursor] + + props_of_b = ["ctime", "f", "g", "h", "identifier", "kind", "name", "now"] + assert await pv("is(bla)", "reported.f", "values") == [23] + assert await pv("is(bla)", "reported.h.inner[*].inner[*].name", "values") == ["in_0_0", "in_0_1"] + assert await pv("is(bla)", "reported.g[*]", "values") == [1, 2, 3, 4] + assert await pv("is(bla)", "reported", "attributes") == props_of_b + assert await pv("is(bla)", predicate_term.parse('reported=~"^[fgh]"'), "attributes") == ["f", "g", "h"] + + @mark.asyncio async def test_no_snapshot_usage(graph_db: ArangoGraphDB, foo_model: Model, db_access: DbAccess) -> None: await graph_db.wipe()