Skip to content

Commit

Permalink
[resotocore][feat] Add with_usage to define usage data (#1699)
Browse files Browse the repository at this point in the history
  • Loading branch information
aquamatthias committed Jul 4, 2023
1 parent a60ebb8 commit e6041b5
Show file tree
Hide file tree
Showing 16 changed files with 329 additions and 83 deletions.
68 changes: 61 additions & 7 deletions resotocore/resotocore/db/arango_query.py
Expand Up @@ -2,10 +2,11 @@
import logging
import re
from collections import defaultdict
from attrs import evolve
from textwrap import dedent
from typing import Union, List, Tuple, Any, Optional, Dict, Set

from arango.typings import Json
from attrs import evolve

from resotocore.constants import less_greater_then_operations as lgt_ops, arangodb_matches_null_ops
from resotocore.db import EstimatedSearchCost, EstimatedQueryCostRating as Rating
Expand Down Expand Up @@ -38,8 +39,10 @@
FulltextTerm,
Limit,
ContextTerm,
WithUsage,
)
from resotocore.util import first, set_value_in_path, exist
from resotocore.util import first, set_value_in_path, exist, utc_str
from resotolib.durations import duration_str

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -400,6 +403,51 @@ def filter_statement(current_cursor: str, part_term: Term, limit: Optional[Limit
query_part += f"LET {filtered_out} = {reverse}({for_stmt}{return_stmt})"
return filtered_out

def with_usage(in_crsr: str, usage: WithUsage, term: Term, limit: Optional[Limit]) -> str:
nonlocal query_part

# split the term and create a filter statement for everything before the usage predicates
before_term, after_term = term.split_by_usage()

# the limit is applied here, when the after term does not filter at all
after_filter_cursor = filter_statement(in_crsr, before_term, limit=limit if after_term.is_all else None)

# add the usage predicates
usage_crs = next_crs("with_usage")
start = next_bind_var_name()
end = next_bind_var_name()
start_s = next_bind_var_name()
duration = next_bind_var_name()
start_time = usage.start_from_now()
end_time = usage.end_from_now()
bind_vars[start] = start_time.timestamp()
bind_vars[end] = end_time.timestamp()
bind_vars[start_s] = utc_str(start_time)
bind_vars[duration] = duration_str(end_time - start_time)
avgs = []
merges = []
for mn in usage.metrics:
avgs.append(f"{mn}_min = MIN(m.v.{mn}[0]), {mn}_avg = AVG(m.v.{mn}[1]), {mn}_max = MAX(m.v.{mn}[2])")
merges.append(f"{mn}: {{min: {mn}_min, avg: {mn}_avg, max: {mn}_max}}")
query_part += dedent(
f"""
let {usage_crs} = (
for r in {after_filter_cursor}
let resource=r
let resource_usage = first(
for m in {db.usage_db.collection_name}
filter m.at>=@{start} and m.at<=@{end} and m.id==r._key
collect aggregate {", ".join(avgs)}, count = sum(1)
return {{usage:{{{",".join(merges)},entries:count,start:@{start_s},duration:@{duration}}}}}
)
return resource_usage.usage.entries ? merge(resource, resource_usage) : resource
)
"""
)

# finally apply the filter that includes the usage predicates
return filter_statement(usage_crs, after_term, limit)

def with_clause(in_crsr: str, clause: WithClause, limit: Optional[Limit]) -> str:
nonlocal query_part
# this is the general structure of the with_clause that is created
Expand Down Expand Up @@ -528,16 +576,22 @@ def navigation(in_crsr: str, nav: Navigation) -> str:
# apply the limit in the filter statement only, when no with clause is present
# otherwise the limit is applied in the with clause
filter_limit = p.limit if p.with_clause is None else None
cursor = in_cursor
part_term = p.term
if isinstance(p.term, MergeTerm):
# do not allow a limit in the prefilter
filter_cursor = filter_statement(in_cursor, p.term.pre_filter, None)
# only allow a limit in the prefilter, if there is no post filter
pre_limit = filter_limit if (p.term.post_filter is None or p.term.post_filter.is_all) else None
filter_cursor = filter_statement(cursor, p.term.pre_filter, pre_limit)
cursor, merge_part = merge(filter_cursor, p.term.merge)
query_part += merge_part
post = p.term.post_filter if p.term.post_filter else AllTerm()
# always do the post filter in case of sort or limit
cursor = filter_statement(cursor, post, filter_limit)
part_term = p.term.post_filter if p.term.post_filter else AllTerm()
if p.with_usage and len(p.with_usage.metrics) > 0:
# filter is applied in the with usage
cursor = with_usage(cursor, p.with_usage, part_term, filter_limit)
else:
cursor = filter_statement(in_cursor, p.term, filter_limit)
cursor = filter_statement(cursor, part_term, filter_limit)

cursor = with_clause(cursor, p.with_clause, p.limit) if p.with_clause else cursor
cursor = navigation(cursor, p.navigation) if p.navigation else cursor
return p, cursor, filtered_out, query_part
Expand Down
14 changes: 0 additions & 14 deletions resotocore/resotocore/db/async_arangodb.py
Expand Up @@ -28,7 +28,6 @@

from resotocore.async_extensions import run_async
from resotocore.error import QueryTookToLongError
from resotocore.metrics import timed
from resotocore.util import identity
from resotocore.ids import GraphName

Expand Down Expand Up @@ -158,7 +157,6 @@ class AsyncArangoDBBase:
def __init__(self, db: Database):
self.db = db

@timed("arango", "aql")
async def aql_cursor(
self,
query: str,
Expand Down Expand Up @@ -208,7 +206,6 @@ async def aql_cursor(
)
return AsyncCursorContext(cursor, trafo)

@timed("arango", "aql")
async def aql(
self,
query: str,
Expand Down Expand Up @@ -256,7 +253,6 @@ async def aql(
max_runtime,
)

@timed("arango", "explain")
async def explain(
self,
query: str,
Expand All @@ -267,7 +263,6 @@ async def explain(
) -> Union[Json, Jsons]:
return await run_async(self.db.aql.explain, query, all_plans, max_plans, opt_rules, bind_vars) # type: ignore

@timed("arango", "execute_transaction")
async def execute_transaction(
self,
command: str,
Expand Down Expand Up @@ -295,7 +290,6 @@ async def execute_transaction(
intermediate_commit_size,
)

@timed("arango", "get")
async def get(
self,
collection: str,
Expand All @@ -305,7 +299,6 @@ async def get(
) -> Optional[Json]:
return await run_async(self.db.collection(collection).get, document, rev, check_rev) # type: ignore

@timed("arango", "insert")
async def insert(
self,
collection: str,
Expand Down Expand Up @@ -333,7 +326,6 @@ async def insert(
merge,
)

@timed("arango", "update")
async def update(
self,
collection: str,
Expand All @@ -358,7 +350,6 @@ async def update(
silent,
)

@timed("arango", "delete")
async def delete(
self,
collection: str,
Expand All @@ -381,18 +372,15 @@ async def delete(
silent,
)

@timed("arango", "all")
async def all(self, collection: str, skip: Optional[int] = None, limit: Optional[int] = None) -> Cursor:
return await run_async(self.db.collection(collection).all, skip, limit) # type: ignore

@timed("arango", "keys")
async def keys(self, collection: str) -> Cursor:
return await run_async(self.db.collection(collection).keys) # type: ignore

async def count(self, collection: str) -> int:
return await run_async(self.db.collection(collection).count) # type: ignore

@timed("arango", "insert_many")
async def insert_many(
self,
collection: str,
Expand All @@ -406,7 +394,6 @@ async def insert_many(
fn = self.db.collection(collection).insert_many
return await run_async(fn, documents, return_new, sync, silent, overwrite, return_old) # type: ignore

@timed("arango", "update_many")
async def update_many(
self,
collection: str,
Expand All @@ -424,7 +411,6 @@ async def update_many(
fn, documents, check_rev, merge, keep_none, return_new, return_old, sync, silent # type: ignore
)

@timed("arango", "delete_many")
async def delete_many(
self,
collection: str,
Expand Down
11 changes: 9 additions & 2 deletions resotocore/resotocore/metrics.py
@@ -1,3 +1,4 @@
import logging
import time
from functools import wraps
from typing import Callable, Any, TypeVar, cast
Expand All @@ -15,6 +16,8 @@
# This way all signature information is preserved!
DecoratedFn = TypeVar("DecoratedFn", bound=Callable[..., Any])

log = logging.getLogger(__name__)


def perf_now() -> float:
return time.perf_counter()
Expand All @@ -38,7 +41,9 @@ def async_time_decorated(*args: Any, **kwargs: Any) -> Any:
rv = fn(*args, **kwargs)
return rv
finally:
metric.observe(perf_now() - start_time)
duration = perf_now() - start_time
log.debug(f"Duration of {module}::{name}: {duration}")
metric.observe(duration)

return cast(DecoratedFn, async_time_decorated)

Expand All @@ -50,7 +55,9 @@ async def async_time_decorated(*args: Any, **kwargs: Any) -> Any:
rv = await fn(*args, **kwargs)
return rv
finally:
metric.observe(perf_now() - start_time)
duration = round((perf_now() - start_time) * 1000)
log.debug(f"Duration of {module}::{name}: {duration} millis")
metric.observe(duration)

return cast(DecoratedFn, async_time_decorated)

Expand Down
2 changes: 1 addition & 1 deletion resotocore/resotocore/model/graph_access.py
Expand Up @@ -76,7 +76,7 @@ class Section:
content = set(content_ordered)

# The list of all lookup sections
lookup_sections_ordered = [ancestors, descendants]
lookup_sections_ordered = [ancestors, descendants, usage]

# The list of all sections
all_ordered = [*content_ordered, *lookup_sections_ordered]
Expand Down

0 comments on commit e6041b5

Please sign in to comment.