Skip to content

Commit

Permalink
[resotocore][feat] Add timeseries command (#1840)
Browse files Browse the repository at this point in the history
  • Loading branch information
aquamatthias committed Dec 4, 2023
1 parent 8f75b5f commit 628e830
Show file tree
Hide file tree
Showing 23 changed files with 674 additions and 122 deletions.
2 changes: 1 addition & 1 deletion resotocore/resotocore/cli/cli.py
Expand Up @@ -394,7 +394,7 @@ async def parse_query(query_arg: str) -> Query:
query_part = "all" if query_part.strip() == "" else query_part
# section expansion is disabled here: it will happen on the final query after all parts have been combined
return await self.dependencies.template_expander.parse_query(
"".join(query_part), None, omit_section_expansion=True, **ctx.env
"".join(query_part), None, omit_section_expansion=True, env=ctx.env
)

query: Query = Query.by(AllTerm())
Expand Down
120 changes: 116 additions & 4 deletions resotocore/resotocore/cli/command.py
Expand Up @@ -51,7 +51,7 @@
from attr import evolve
from attrs import define, field
from dateutil import parser as date_parser
from parsy import Parser, string
from parsy import Parser, string, ParseError
from resotoclient.models import Model as RCModel, Kind as RCKind
from resotodatalink import EngineConfig
from resotodatalink.batch_stream import BatchStream
Expand Down Expand Up @@ -140,7 +140,7 @@
NavigateUntilLeaf,
IsTerm,
)
from resotocore.query.query_parser import parse_query, aggregate_parameter_parser
from resotocore.query.query_parser import parse_query, aggregate_parameter_parser, predicate_term
from resotocore.query.template_expander import tpl_props_p
from resotocore.report import BenchmarkConfigPrefix, ReportSeverity
from resotocore.report.benchmark_renderer import respond_benchmark_result
Expand Down Expand Up @@ -170,6 +170,7 @@
)
from resotocore.worker_task_queue import WorkerTask, WorkerTaskName
from resotolib.core import CLIEnvelope
from resotolib.durations import parse_duration
from resotolib.parse_util import (
double_quoted_or_simple_string_dp,
space_dp,
Expand Down Expand Up @@ -1431,7 +1432,7 @@ def parse_at(x: Optional[str]) -> Optional[datetime]:
at: Optional[datetime] = parse_at(parsed.get("at", None))

# all templates are expanded at this point, so we can call the parser directly.
query = parse_query(rest, **ctx.env)
query = parse_query(rest, ctx.env)

async def get_db(at: Optional[datetime], graph_name: GraphName) -> Tuple[GraphDB, GraphName]:
db_access = self.dependencies.db_access
Expand Down Expand Up @@ -3904,7 +3905,7 @@ async def put_template(name: str, template_query: str) -> AsyncIterator[str]:
# try to render_console the template with dummy values and see if the search can be parsed
try:
rendered_query = self.dependencies.template_expander.render(template_query, defaultdict(lambda: True))
parse_query(rendered_query, **ctx.env)
parse_query(rendered_query, ctx.env)
except Exception as ex:
raise CLIParseError(f"Given template does not define a valid search: {template_query}") from ex
await self.dependencies.template_expander.put_template(Template(name, template_query))
Expand Down Expand Up @@ -6036,6 +6037,115 @@ def key_fn(node: Json) -> Union[str, Tuple[str, str]]:
raise AttributeError("Wrong or insufficient arguments. Execute `help db` to get more information.")


class TimeSeriesCommand(CLICommand):
"""
```
timeseries snapshot --name <time series> <aggregate search>
timeseries get --name <time series> --start <time> --end <time> --granularity <duration|integer>
--group <group> --filter <var><op><value>
```
The timeseries command can be used to add data to a time series or to retrieve data from a time series.
To append values to a time series, it's necessary to perform an aggregate search query on a graph.
As this graph reflects the present state of affairs,
running an aggregate search on it essentially captures the current state into the time series.
By regularly taking snapshots of the time series using the same aggregate search query,
you can effectively track how things evolve over time.
To retrieve data from a time series, you need to specify the name of the time series, as well as a time range.
It is possible to group the data by a certain variable, and to filter the data by a certain variable.
## Parameters
- `--name` - The name of the time series.
- `--start` - The start of the time range.
- `--end` - The end of the time range.
- `--group` - The variable(s) to select in the group. Can be repeated multiple times.
- `--filter` - The variable(s) to filter. Can be repeated multiple times.
- `--granularity` - The granularity of the time series. Can be a duration or an integer.
## Examples
```
# compute the number of instances per instance type and store it in a time series
> timeseries snapshot --name instances aggregate(instance_type: sum(1)): is(instance)
# retrieve the number of instances per instance type since yesterday
> timeseries get --name instances --start @yesterday@ --end @now@
```
"""

@property
def name(self) -> str:
return "timeseries"

def info(self) -> str:
return "Operations on time series."

def args_info(self) -> ArgsInfo:
return {
"snapshot": [
ArgInfo("--name", help_text="<time series>."),
ArgInfo(expects_value=True, value_hint="search"),
],
"get": [
ArgInfo("--name", expects_value=True, help_text="<time series>"),
ArgInfo("--start", expects_value=True, value_hint="datetime", help_text="<time series>"),
ArgInfo("--end", expects_value=True, value_hint="datetime", help_text="<time series>"),
ArgInfo("--group", expects_value=True, can_occur_multiple_times=True, help_text="<group>"),
ArgInfo("--filter", expects_value=True, can_occur_multiple_times=True, help_text="<var><op><value>"),
ArgInfo("--granularity", expects_value=True, help_text="<duration|integer>"),
],
}

def parse(self, arg: Optional[str] = None, ctx: CLIContext = EmptyContext, **kwargs: Any) -> CLIAction:
async def snapshot_time_series(part: str) -> AsyncIterator[str]:
parser = NoExitArgumentParser()
parser.add_argument("--name", type=str, required=True)
parsed, rest = parser.parse_known_args(args_parts_parser.parse(part))
graph_name = ctx.graph_name
graph_db = self.dependencies.db_access.get_graph_db(graph_name)
model = await self.dependencies.model_handler.load_model(graph_name)
query = await self.dependencies.template_expander.parse_query(" ".join(rest), ctx.section, env=ctx.env)
query_model = QueryModel(query, model, ctx.env)
res = await self.dependencies.db_access.time_series_db.add_entries(parsed.name, query_model, graph_db)
yield f"{res} entries added to time series {parsed.name}."

async def load_time_series(part: str) -> Tuple[CLISourceContext, AsyncIterator[Json]]:
def parse_duration_or_int(s: str) -> Union[int, timedelta]:
try:
return parse_duration(s)
except ParseError:
return int(s)

parser = NoExitArgumentParser()
parser.add_argument("--name", type=str, required=True)
parser.add_argument("--start", type=parse_time_or_delta, required=True)
parser.add_argument("--end", type=parse_time_or_delta, required=True)
parser.add_argument("--group", type=str, nargs="*", default=None)
parser.add_argument("--filter", type=predicate_term.parse, nargs="*", default=None)
parser.add_argument("--granularity", type=parse_duration_or_int, default=5)
p = parser.parse_args(args_parts_unquoted_parser.parse(part))
cursor = await self.dependencies.db_access.time_series_db.load_time_series(
p.name, p.start, p.end, group_by=p.group, filter_by=p.filter, granularity=p.granularity
)
return CLISourceContext(cursor.count(), cursor.full_count()), cursor

args = re.split("\\s+", arg, maxsplit=1) if arg else []
if arg and len(args) == 2 and args[0] == "snapshot":
return CLISource.single(
partial(snapshot_time_series, args[1].strip()), required_permissions={Permission.read}
)
elif arg and len(args) == 2 and args[0] == "get":
return CLISource(partial(load_time_series, args[1].strip()), required_permissions={Permission.read})
else:
return CLISource.single(
lambda: stream.just(self.rendered_help(ctx)), required_permissions={Permission.read}
)


def all_commands(d: TenantDependencies) -> List[CLICommand]:
commands = [
AggregateCommand(d, "search"),
Expand Down Expand Up @@ -6077,6 +6187,7 @@ def all_commands(d: TenantDependencies) -> List[CLICommand]:
SystemCommand(d, "setup", allowed_in_source_position=True),
TagCommand(d, "action"),
TailCommand(d, "search"),
TimeSeriesCommand(d, "action", allowed_in_source_position=True),
UniqCommand(d, "misc"),
UserCommand(d, "setup", allowed_in_source_position=True),
WorkflowsCommand(d, "action", allowed_in_source_position=True),
Expand Down Expand Up @@ -6114,6 +6225,7 @@ def alias_names() -> Dict[str, str]:
"job": "jobs",
"lists": "list",
"template": "templates",
"ts": "timeseries",
"workflow": "workflows",
"app": "apps",
"man": "help",
Expand Down
4 changes: 4 additions & 0 deletions resotocore/resotocore/cli/model.py
Expand Up @@ -115,6 +115,10 @@ class CLIContext:
def graph_name(self) -> GraphName:
return GraphName(self.env["graph"])

@property
def section(self) -> str:
return self.env.get("section", PathRoot)

@property
def user_permissions(self) -> Set[Permission]:
return self.user.permissions if self.user else set()
Expand Down
3 changes: 3 additions & 0 deletions resotocore/resotocore/core_config.py
Expand Up @@ -192,6 +192,9 @@ class DatabaseConfig(ConfigObject):
default=False, metadata={"description": "If the connection should not be verified (default: False)"}
)
request_timeout: int = field(default=900, metadata={"description": "Request timeout in seconds (default: 900)"})
time_series_ttl: timedelta = field(
default=timedelta(days=90), metadata={"description": "Time series TTL (default: 90d)"}
)


@define(order=True, hash=True, frozen=True)
Expand Down

0 comments on commit 628e830

Please sign in to comment.