/
command.py
6121 lines (5125 loc) · 249 KB
/
command.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
from __future__ import annotations
import asyncio
import csv
import io
import json
import logging
import os.path
import re
import shutil
import tarfile
from abc import abstractmethod, ABC
from argparse import Namespace
from asyncio import Future, Task
# noinspection PyProtectedMember
from asyncio.subprocess import Process
from collections import defaultdict
from contextlib import suppress
from datetime import timedelta, datetime
from functools import partial, lru_cache
from itertools import dropwhile, chain
from pathlib import Path
from typing import (
Dict,
List,
Tuple,
Optional,
Any,
AsyncIterator,
Hashable,
Iterable,
Callable,
Awaitable,
cast,
Set,
FrozenSet,
Union,
TYPE_CHECKING,
)
from urllib.parse import urlparse, urlunparse
import aiofiles
import jq
import yaml
from aiofiles.tempfile import TemporaryDirectory
from aiohttp import ClientTimeout, JsonPayload, BasicAuth
from aiostream import stream, pipe
from aiostream.aiter_utils import is_async_iterable
from aiostream.core import Stream
from attr import evolve
from attrs import define, field
from dateutil import parser as date_parser
from parsy import Parser, string
from resotoclient.models import Model as RCModel, Kind as RCKind
from resotodatalink import EngineConfig
from resotodatalink.batch_stream import BatchStream
from resotodatalink.collect_plugins import update_sql
from rich.padding import Padding
from rich.panel import Panel
from rich.table import Table
from rich.text import Text
from resotocore import version
from resotocore.async_extensions import run_async
from resotocore.cli import (
JsGen,
NoExitArgumentParser,
args_parts_parser,
args_parts_unquoted_parser,
is_edge,
is_node,
js_value_at,
key_values_parser,
parse_time_or_delta,
strip_quotes,
key_value_parser,
JsStream,
js_value_get,
)
from resotocore.cli.model import (
CLICommand,
CLIContext,
EmptyContext,
CLIAction,
CLISource,
CLIFlow,
InternalPart,
OutputTransformer,
PreserveOutputFormat,
MediaType,
CLIFileRequirement,
ParsedCommand,
NoTerminalOutput,
ArgsInfo,
ArgInfo,
EntityProvider,
FilePath,
CLISourceContext,
)
from resotocore.user.model import Permission, AllowedRoleNames
from resotocore.cli.tip_of_the_day import SuggestionPolicy, SuggestionStrategy, get_suggestion_strategy
from resotocore.config import ConfigEntity
from resotocore.db.async_arangodb import AsyncCursor
from resotocore.db.graphdb import HistoryChange, GraphDB
from resotocore.db.model import QueryModel
from resotocore.db.runningtaskdb import RunningTaskData
from resotocore.system_start import system_info
from resotocore.error import CLIParseError, ClientError, CLIExecutionError, NotEnoughPermissions
from resotocore.ids import ConfigId, TaskId, InfraAppName, TaskDescriptorId, GraphName, Email, Password
from resotocore.infra_apps.manifest import AppManifest
from resotocore.infra_apps.package_manager import Failure
from resotocore.model.graph_access import Section, EdgeTypes
from resotocore.model.model import (
Model,
Kind,
ComplexKind,
DictionaryKind,
SimpleKind,
Property,
ArrayKind,
PropertyPath,
TransformKind,
AnyKind,
)
from resotocore.model.resolve_in_graph import NodePath
from resotocore.model.typed_model import to_json, to_js, from_js
from resotocore.query.model import (
Query,
P,
Template,
NavigateUntilRoot,
Term,
AggregateFunction,
AggregateVariableName,
AggregateVariableCombined,
Aggregate,
AggregateVariable,
Part,
NavigateUntilLeaf,
IsTerm,
)
from resotocore.query.query_parser import parse_query, aggregate_parameter_parser
from resotocore.query.template_expander import tpl_props_p
from resotocore.report import BenchmarkConfigPrefix, ReportSeverity
from resotocore.report.benchmark_renderer import respond_benchmark_result
from resotocore.task.task_description import Job, TimeTrigger, EventTrigger, ExecuteCommand, Workflow, RunningTask
from resotocore.types import Json, JsonElement, EdgeType
from resotocore.user import ResotoUser
from resotocore.util import (
uuid_str,
utc,
if_set,
duration,
identity,
rnd_str,
set_value_in_path,
restart_service,
combine_optional,
value_in_path,
)
from resotocore.web.content_renderer import (
respond_ndjson,
respond_json,
respond_text,
respond_graphml,
respond_dot,
respond_yaml,
respond_cytoscape,
)
from resotocore.worker_task_queue import WorkerTask, WorkerTaskName
from resotolib.core import CLIEnvelope
from resotolib.parse_util import (
double_quoted_or_simple_string_dp,
space_dp,
make_parser,
variable_dp,
literal_dp,
comma_p,
variable_p,
equals_p,
json_value_p,
)
from resotolib.utils import safe_members_in_tarfile, get_local_tzinfo
from resotolib.x509 import write_cert_to_file, write_key_to_file
if TYPE_CHECKING:
from resotocore.dependencies import TenantDependencies
log = logging.getLogger(__name__)
# A SearchCLIPart is a command that can be used on the command line.
# Such a part is not executed, but builds a search, which is executed.
# Therefore, the parse method is implemented in a dummy fashion here.
# The real interpretation happens in CLI.create_query.
class SearchCLIPart(CLICommand, EntityProvider, ABC):
def parse(self, arg: Optional[str] = None, ctx: CLIContext = EmptyContext, **kwargs: Any) -> CLIAction:
return CLISource.empty()
class SearchPart(SearchCLIPart):
"""
```shell
search [--with-edges] [--explain] <search-statement>
```
This command allows to search the graph using filters, traversals, functions and aggregates.
## Options
- `--with-edges`: Return edges in addition to nodes.
- `--explain`: Instead of executing the search, analyze its cost.
- `--at <time|delta>`: Perform search on the snapshot of a graph just before the given time.
## Parameters
- `search-statement` [mandatory]: The search to execute.
### Filters
Filters have the form `path op value`.
- `path` is the complete path of names in the json structure combined with a dot (e.g. reported.cpu_count).
In case the path contains elements, that are not json conform,
they can be put into backticks (e.g. foo.bla.\\`:-)\\`.baz).
- `operator` is one of: `<=`, `>=`, `>`, `<`, `==`, `!=`, `=~`, `!~`, `in`, `not in`.
Note: `=` is the same as `==` and `~` is the same as `=~`.
- value is a json literal (e.g. `"test"`, `23`, `[1, 2, 3]`, `true`, `{"a": 12}`).
Note: the search statement allows to omit the parentheses for strings most of the time.
In case it contains whitespace or a special characters, you should put the string into parentheses.
Example:
```shell
> search reported.cpu_count >= 4
> search name!="test"
> search title in ["first", "second"]
> search some_array[3].test.number > 6
> search some_array[*].test.number < 4
```
Filters can be combined with `and` and `or` and use parentheses.
Example:
```shell
> search (cpu_count>=4 and name!="test") or (title in ["first", "second"] and name=="test")
```
### Traversals
Outbound traversals are traversals from a node in direction of the edge to another node, while
inbound traversals walk the graph in opposite direction.
Assuming 2 nodes with one connecting directed edge: `NodeA ---> NodeB`,
traversing outbound from `NodeA` will yield `NodeB`, while traversing inbound from `NodeB` will yield `NodeA`.
The syntax for outbound traversals is `-->` and for inbound traversals is `<--`.
A traversal can be refined and allows to define the number of levels to walk in the graph:
- `-[1:1]->` (shorthand for `-->`) starts from the current node and selects all nodes that can be reached by walking
exactly one step outbound.
- `-[0:1]->` starts (and includes) the current node and selects all nodes that can be reached by walking exactly
one step outbound.
- `-[<x>:<y>]->` walks from the current node to all nodes that can be reached with x steps outbound.
From here all nodes are selected including all nodes that can be reached in y steps outbound
relative to the starting node.
- `-[<x>]->` shorthand `-[<x>:<x>]->`
- `-[<x>:]->` walks from the current node to all nodes that can be reached with x steps outbound.
From here all nodes to the graph leafs are selected.
The same logic is used for inbound traversals (`<--`, `<-[0:1]-`, `<-[2]-`, `<-[2:]-`).
### Functions
There are predefined functions that can be used in combination with any filter.
- is(<kind>): selects all nodes that are of type <kind> or any subtype of <kind>.
Example: is(volume) will select all GCP disks and all AWS EC2 volumes, since both types inherit from
base type volume.
- id(<identifier>): selects the node with the given node identifier <identifier>.
Example: id(foo) will select the node with id foo. The id is a synthetic id created by the collector
and usually does not have a meaning, other than identifying a node uniquely.
- has_key(<path>): tests if the specified name is defined in the json object.
Example: is(volume) and has_key(tags, owner)
### Aggregations
Aggregate data by using on of the following functions: `sum`, `avg`, `min`, `max` and `count`.
Multiple aggregation functions can be applied to the result set by separating them by comma.
Each aggregation function can be named via an optional `as <name>` clause.
Aggregation functions can be grouped using aggregation values.
Multiple grouping values can be defined by separating them via comma.
Each grouping variable can be renamed via an optional `as <name>` clause.
Examples:
```shell
> search aggregate(kind: sum(1)): is(volume)
> search aggregate(kind as kind: sum(1) as count): is(volume)
> search aggregate(kind, volume_type: sum(1) as count): is(volume)
> search aggregate(kind: sum(volume_size) as summed, sum(1) as count): is(volume)
> search aggregate(sum(volume_size) as summed, sum(1) as count): is(volume)
```
### Sort and Limit
The number of search results can be limited to a defined number by using limit <limit>
and sorted by using sort <sort_column> [asc, desc].
Limit and sort is allowed before a traversal and as last statement to the search result.
Example:
```
> search is(volume) sort volume_size desc limit 3 <-[2]- sort name limit 1
```
Use --explain to understand the cost of a search. A search explanation has this form (example):
```json
{
"available_nr_items": 142670,
"estimated_cost": 61424,
"estimated_nr_items": 1,
"full_collection_scan": false,
"rating": "Simple"
}
```
- `available_nr_items` describe the number of all available nodes in the graph.
- `estimated_cost shows` the absolute cost of this search. See rating for an interpreted number.
- `estimated_nr_items` estimated number of items returned for this search.
It is computed based on search statistics and heuristics and does not reflect the real number.
- `full_collection_scan` indicates, if a full collection scan is required.
In case this is true, the search does not take advantage of any indexes.
- `rating` The more general rating of this search.
Simple: The estimated cost is fine - the search will most probably run smoothly.
Complex: The estimated cost is quite high. Check other properties. Maybe an index can be used?
Bad: The estimated cost is very high. It will most probably run long and/or will take a lot of resources.
## Examples
```shell
# Search all volumes with state available
> search is(volume) and volume_status=available
kind=gcp_disk, id=71, name=gke-1, volume_status=available, age=5mo26d, cloud=gcp, account=dev, region=us-central1
kind=gcp_disk, id=12, name=pvc-2, volume_status=available, age=4mo15d, cloud=gcp, account=eng, region=us-west1
kind=gcp_disk, id=17, name=pvc-2, volume_status=available, age=9mo29d, cloud=gcp, account=eng, region=us-west1
# Other sections than reported, need to be defined from the root /
> search is(volume) and /desired.cleanup=true
# Sort and limit the number of results
> search is(volume) sort name asc limit 3
kind=aws_ec2_volume, id=vol-1, name=adf-image-1, age=2mo1d, cloud=aws, account=general-support, region=us-west-2
kind=aws_ec2_volume, id=vol-2, name=adf-image-2, age=2mo1d, cloud=aws, account=general-support, region=us-west-2
# Emit nodes together with the edges
> search --with-edges id(root) -[0:1]->
node_id=root, kind=graph_root, id=root, name=root
node_id=L_tRxI2tn6iLZdK3e8EQ3w, kind=cloud, id=gcp, name=gcp, age=5d5h, cloud=gcp
root -> L_tRxI2tn6iLZdK3e8EQ3w
node_id=WYcfqyMIkPAPoAHiEIIKOw, kind=cloud, id=aws, name=aws, age=5d5h, cloud=aws
root -> WYcfqyMIkPAPoAHiEIIKOw
# Aggregate resulting nodes
> search aggregate(kind as kind: sum(1) as count): is(volume)
group:
kind: aws_ec2_volume
count: 1799
---
group:
kind: gcp_disk
count: 1100
# Do not execute the search, but show an explanation of the search cost.
> search --explain is(graph_root) -[0:1]->
available_nr_items: 142670
estimated_cost: 58569
estimated_nr_items: 8
full_collection_scan: false
rating: simple
# Search all volumes on a snapshot from the past
> search --at 2023-05-07T12:34:56Z is(volume)
> search --at 1w is(volume)
```
## Environment Variables
- `graph` [default=resoto]: the name of the graph to operate on.
- `section` [default=reported]: interpret all property paths with respect to this section.
With section `reported` set, the search `name=~"test"` would be interpreted as `reported.name=~"test"`.
Note: the resotoshell sets the section to reported by default.
If you want to quickly override the section on one command line, you can define env vars in from of the
command line (e.g.: `section=desired search clean==true`). It is possible to use absolute path using `/`,
so all paths have to be defined from root (e.g.: `search desired.clean==true`)
See [https://resoto.com/docs](https://resoto.com/docs) for a more detailed explanation of search.
"""
@property
def name(self) -> str:
return "search"
def info(self) -> str:
return "Search the graph."
def args_info(self) -> ArgsInfo:
return [
ArgInfo(expects_value=True, value_hint="search"),
ArgInfo("--with-edges", help_text="include edges in result"),
ArgInfo("--at", help_text="timestamp | timedelta"),
]
class HistoryPart(SearchCLIPart):
"""
```shell
history [--before <time|delta>] [--after <time|delta>] [--change <change>] [search-statement]
```
Return all changes of the graph based on the given criteria.
Whenever changes are given to Resoto, a dedicated change event is written as separate entity.
Following changes are supported:
- node_created: a node is added to the graph that has not been seen before.
- node_updated: a node is delivered and is different to the one in the graph.
- node_deleted: a node is no longer reported and gets deleted from the graph.
## Options
- `--before` <time|delta>: only show changes before this timestamp or timedelta.
- `--after` <time|delta>: only show changes after this timestamp or timedelta.
- `--change` <change>: one of `node_created`, `node_deleted`, `node_updated`
## Parameters
- `search-statement`: a search statement to filter the history
## Examples
```shell
# Show all nodes changed in the last hour
> history --after 1h
change=node_updated, changed_at=2022-01-01T03:00:59Z, kind=kubernetes_config_map, id=73616434 name=leader, cloud=k8s
change=node_deleted, changed_at=2022-01-01T04:40:59Z, kind=aws_vpc, id=vpc-1, name=resoto-eks, cloud=aws
# Show all nodes created on 1.1.2022 between 03:00 and 06:00 (UTC)
> history --change node_created --after 2022-01-01T03:00:00Z --before 2022-01-02T06:00:00Z
change=node_created, changed_at=2022-01-01T05:40:59Z, kind=aws_iam_role, id=AROA, name=some-role, cloud=aws
# Show all changes to kubernetes resources in the kube-system namespace
> history is(kubernetes_resource) and namespace=kube-system
change=node_created, changed_at=2022-11-18T12:00:49Z, kind=kubernetes_role, name=eks, namespace=kube-system
change=node_updated, changed_at=2022-11-18T12:00:50Z, kind=kubernetes_config_map, name=cert, namespace=kube-system
```
"""
@property
def name(self) -> str:
return "history"
def info(self) -> str:
return "Search the history of nodes."
def args_info(self) -> ArgsInfo:
return [
ArgInfo("--after", help_text="timestamp | timedelta", expects_value=True, value_hint="timestamp"),
ArgInfo("--before", help_text="timestamp | timedelta", expects_value=True, value_hint="timestamp"),
ArgInfo(
"--change",
help_text="type of change",
expects_value=True,
possible_values=[e.value for e in list(HistoryChange)],
),
ArgInfo(expects_value=True, value_hint="search"),
]
class SortPart(SearchCLIPart):
"""
```shell
sort <sort_property> [asc|desc], <sort_property> [asc|desc], ...
```
Sort the search results based on the given properties in the given order.
## Parameters
- <sort_property> [mandatory]: the property to sort by.
- [asc|desc] [optional, default to asc]: the sort order as ascending or descending.
## Examples
```shell
# Search all volumes and sort by volume size ascending, showing the smallest volume first.
> search is(volume) | sort volume_size desc | head -1
kind=aws_ec2_volume, id=vol-1, name=vol-2, age=1yr5mo, cloud=aws, account=eng, region=us-west-2
# Add a second search criteria
> search is(volume) | sort volume_size asc, name desc | head -2
kind=example_volume, id=Vol2, name=Vol2, age=1mo8d, cloud=example, account=Example Account, region=US East
kind=example_volume, id=Vol1, name=Vol1, age=1mo8d, cloud=example, account=Example Account, region=US West
# Same search as before, now sort by name ascending
> search is(volume) | sort volume_size asc, name asc | head -2
kind=example_volume, id=Vol1, name=Vol1, age=1mo8d, cloud=example, account=Example Account, region=US West
kind=example_volume, id=Vol2, name=Vol2, age=1mo8d, cloud=example, account=Example Account, region=US East
```
"""
@property
def name(self) -> str:
return "sort"
def info(self) -> str:
return "Sort the search results."
def args_info(self) -> ArgsInfo:
return [ArgInfo(expects_value=True, help_text="<property> [asc|desc]")]
class LimitPart(SearchCLIPart):
"""
```shell
limit [offset] <nr_items>
```
Limit allows to define an optional offset as well as the number if item to return.
## Parameters
- offset [optional, default to 0]: drop the first number of items and start at defined position.
- nr_items [mandatory]: the number of items to return.
## Examples
```shell
# Return the first 3 results from the search
> search is(volume) | limit 3
kind=aws_ec2_volume, id=vol-0, name=fs-0, age=2mo23d, cloud=aws, account=eng, region=us-west-2
kind=aws_ec2_volume, id=vol-1, name=fs-1, age=2mo23d, cloud=aws, account=eng, region=us-west-2
kind=aws_ec2_volume, id=vol-2, name=fs-2, age=2mo23d, cloud=aws, account=eng, region=us-west-1
# Return one result from the search dropping the first 2 items
> search is(volume) | limit 2, 1
kind=aws_ec2_volume, id=vol-2, name=fs-2, age=2mo23d, cloud=aws, account=eng, region=us-west-1
```
"""
@property
def name(self) -> str:
return "limit"
def info(self) -> str:
return "Limit the number of returned search results."
def args_info(self) -> ArgsInfo:
return [ArgInfo(expects_value=True, help_text="[offset], <nr_items> to return")]
class PredecessorsPart(SearchCLIPart):
"""
```shell
predecessors [--with-origin] [edge_type]
```
This command extends an already existing search.
It will select all predecessors of the currently selected nodes of the search.
The graph may contain different types of edges (e.g. the `default` graph or the `delete` graph).
In order to define which graph to walk, the edge_type can be specified.
If --with-origin is specified, the current element is included in the result set as well.
Assume node A with descendant B with descendant C: A --> B --> C `search id(C) | predecessors`
will select B, while `search id(A) | predecessors --with-origin` will select C and B.
## Options
- `--with-origin` [Optional, default to false]: includes the current element into the result set.
## Parameters
- `edge_type` [Optional, default to `default`]: Defines the type of edge to navigate.
## Environment Variables
- `edge_type` [Optional]: Defines the type of the edge to navigate.
The parameter takes precedence over the env var.
## Examples
```shell
> search is(volume) and volume_status=available | predecessors | search is(volume_type)
kind=gcp_disk_type, name=pd-standard, age=2yr1mo, cloud=gcp, account=eng, region=us-central1, zone=us-central1-a
kind=gcp_disk_type, name=pd-standard, age=2yr1mo, cloud=gcp, account=sre, region=us-central1, zone=us-central1-a
kind=aws_ec2_volume_type, name=gp2, age=5d8h, cloud=aws, account=sales, region=us-west-2
```
"""
@property
def name(self) -> str:
return "predecessors"
def info(self) -> str:
return "Select predecessors of incoming nodes in the graph."
def args_info(self) -> ArgsInfo:
return [
ArgInfo("--with-origin"),
ArgInfo(expects_value=True, possible_values=["default", "delete"], help_text="edge type"),
]
@staticmethod
def parse_args(arg: Optional[str], ctx: CLIContext) -> Tuple[int, EdgeType]:
def valid_edge_type(name: str) -> str:
if name in EdgeTypes.all:
return name
else:
raise AttributeError(f'Given name is not a valid edge type: {name}. {", ".join(EdgeTypes.all)}')
parser = NoExitArgumentParser()
parser.add_argument("--with-origin", dest="origin", default=1, action="store_const", const=0)
parser.add_argument(
"edge", default=ctx.env.get("edge_type", EdgeTypes.default), type=valid_edge_type, nargs="?"
)
parsed = parser.parse_args(arg.split() if arg else [])
return parsed.origin, parsed.edge
class SuccessorsPart(SearchCLIPart):
"""
```shell
successors [--with-origin] [edge_type]
```
This command extends an already existing search.
It will select all successors of the currently selected nodes of the search.
The graph may contain different types of edges (e.g. the `default` graph or the `delete` graph).
In order to define which graph to walk, the edge_type can be specified.
If --with-origin is specified, the current element is included in the result set as well.
Assume node A with descendant B with descendant C: A --> B --> C `search id(A) | successors`
will select B, while `search id(A) | successors --with-origin` will select C and B.
## Options
- `--with-origin` [Optional, default to false]: includes the current element into the result set.
## Parameters
- `edge_type` [Optional, default to `default`]: Defines the type of edge to navigate.
## Environment Variables
- `edge_type` [Optional]: Defines the type of the edge to navigate.
The parameter takes precedence over the env var.
## Examples
```shell
> search is(volume_type) | successors | search is(volume)
kind=gcp_disk, id=16, name=gke16, age=8mo29d, cloud=gcp, account=eng, region=us-west1, zone=us-west1-a
kind=gcp_disk, id=26, name=gke26, age=8mo29d, cloud=gcp, account=eng, region=us-west1, zone=us-west1-a
kind=aws_ec2_volume, id=vol1, name=vol1, age=2mo11d, cloud=aws, account=insights, region=us-west-2
```
"""
@property
def name(self) -> str:
return "successors"
def info(self) -> str:
return "Select successors of incoming nodes in the graph."
def args_info(self) -> ArgsInfo:
return [
ArgInfo("--with-origin"),
ArgInfo(expects_value=True, possible_values=["default", "delete"], help_text="edge type"),
]
class AncestorsPart(SearchCLIPart):
"""
```shell
ancestors [--with-origin] [edge_type]
```
This command extends an already existing search.
It will select all ancestors of the currently selected nodes of the search.
The graph may contain different types of edges (e.g. the `default` graph or the `delete` graph).
In order to define which graph to walk, the edge_type can be specified.
If --with-origin is specified, the current element is included in the result set as well.
Assume node A with descendant B with descendant C: A --> B --> C `search id(C) | ancestors`
will select B and A, while `search id(C) | ancestors --with-origin` will select C and B and A.
## Options
- `--with-origin` [Optional, default to false]: includes the current element into the result set.
## Parameters
- `edge_type` [Optional, default to `default`]: Defines the type of edge to navigate.
## Environment Variables
- `edge_type` [Optional]: Defines the type of the edge to navigate.
The parameter takes precedence over the env var.
## Examples
```shell
> search is(volume_type) limit 1 | ancestors
kind=gcp_service_sku, id=D2, name=Storage PD Capacity, age=5d8h, cloud=gcp, account=sre
kind=gcp_zone, id=2, name=us-central1-a, age=52yr1mo, cloud=gcp, account=sre, region=us-central1, zone=us-central1-a
kind=gcp_region, id=1000, name=us-central1, age=52yr1mo, cloud=gcp, account=sre, region=us-central1
kind=gcp_service, id=6F81-5844-456A, name=Compute Engine, age=5d8h, cloud=gcp, account=sre
kind=gcp_project, id=sre-tests, name=sre-tests, age=5d8h, cloud=gcp, account=sre
kind=cloud, id=gcp, name=gcp, age=5d8h, cloud=gcp
kind=graph_root, id=root, name=root
```
"""
@property
def name(self) -> str:
return "ancestors"
def info(self) -> str:
return "Select ancestors of incoming nodes in the graph."
def args_info(self) -> ArgsInfo:
return [
ArgInfo("--with-origin"),
ArgInfo(expects_value=True, possible_values=["default", "delete"], help_text="edge type"),
]
class DescendantsPart(SearchCLIPart):
"""
```shell
descendants [--with-origin] [edge_type]
```
This command extends an already existing search.
It will select all descendants of the currently selected nodes of the search.
The graph may contain different types of edges (e.g. the `default` graph or the `delete` graph).
In order to define which graph to walk, the edge_type can be specified.
If --with-origin is specified, the current element is included in the result set as well.
Assume node A with descendant B with descendant C: A --> B --> C `search id(A) | descendants`
will select B and A, while `search id(A) | descendants --with-origin` will select C and B and A.
## Options
- `--with-origin` [Optional, default to false]: includes the current element into the result set.
## Parameters
- `edge_type` [Optional, default to `default`]: Defines the type of edge to navigate.
## Environment Variables
- `edge_type` [Optional]: Defines the type of the edge to navigate.
The parameter takes precedence over the env var.
## Examples
```shell
> search is(volume_type) limit 1 | descendants --with-origin
kind=gcp_disk_type, name=pd-standard, age=52yr1mo, cloud=gcp, account=sre, region=us-central1, zone=us-central1-a
kind=gcp_disk, id=881, name=disk-1, age=1yr2mo, cloud=gcp, account=sre, region=us-central1, zone=us-central1-a
```
"""
@property
def name(self) -> str:
return "descendants"
def info(self) -> str:
return "Select descendants of incoming nodes in the graph."
def args_info(self) -> ArgsInfo:
return [
ArgInfo("--with-origin"),
ArgInfo(expects_value=True, possible_values=["default", "delete"], help_text="edge type"),
]
@define(slots=True)
class _AggregateIntermediateResult:
group_count: int = 0
# function_result_name -> (value, count)
fn_values: Dict[str, tuple[Optional[int], int]] = field(factory=lambda: defaultdict(lambda: (None, 0)))
class AggregateCommand(SearchCLIPart):
"""
```shell
aggregate [group_prop, .., group_prop]: [function(), .. , function()]
```
This command extends an already existing search.
Using the results of a search by aggregating over given properties and applying given aggregation functions.
Aggregate data by using on of the following functions: `sum`, `avg`, `min`, `max` and `count`.
Multiple aggregation functions can be applied to the result set by separating them by comma.
Each aggregation function can be named via an optional `as <name>` clause.
Aggregation functions can be grouped using aggregation values.
Multiple grouping values can be defined by separating them via comma.
Each grouping variable can be renamed via an optional `as <name>` clause.
## Parameters
- `group_prop`: the name of the property to use for grouping.
Multiple grouping variables are possible, separated by comma.
Every grouping variable can be renamed via an as name directive (`prop as prop_name`).
- `function`: grouping function to be applied on every resulting node.
Following functions are possible: `sum`, `count`, `min`, `max`, `avg`.
The function contains the variable name (e.g.: min(path.to.prop))
It is possible to use static values (e.g.: sum(1))
It is possible to use simple math expressions in the function (e.g. min(path.to.prop * 3 + 2))
It is possible to name the result of this function (e.g. count(foo) as number_of_foos)
## Examples
```shell
# Count all volumes in the system based on the kind
> search is(volume) | aggregate kind as kind: sum(1) as count
group:
kind: aws_ec2_volume
count: 1799
---
group:
kind: gcp_disk
count: 1100
# Count all volumes in the system together with the complete volume size based on the kind
> search is(volume) | aggregate kind: sum(volume_size) as summed, sum(1) as count
group:
reported.kind: aws_ec2_volume
summed: 130903
count: 1799
---
group:
reported.kind: gcp_disk
summed: 23930
count: 1100
# Sum the available volume size without any group
> search is(volume) | aggregate sum(volume_size) as summed, sum(1) as count
summed: 154833
count: 2899
```
"""
@property
def name(self) -> str:
return "aggregate"
def info(self) -> str:
return "Aggregate the result of a search by the provided specification."
def args_info(self) -> ArgsInfo:
return [ArgInfo(expects_value=True, value_hint="aggregate", help_text="aggregation specification")]
@staticmethod
async def aggregate_in(
content: JsStream,
group_props: Optional[List[str]] = None,
fn_props: Optional[List[AggregateFunction]] = None,
) -> Dict[tuple[Any, ...], _AggregateIntermediateResult]:
"""
Aggregate the number of elements in the stream, grouped by the provided group properties and
aggregated by the given aggregate functions.
:param content: the stream to aggregate
:param group_props: the properties to group by
:param fn_props: the aggregate functions to apply
:return: A dictionary with the group properties as key and the aggregate result as value
"""
counter: Dict[tuple[Any, ...], _AggregateIntermediateResult] = defaultdict(_AggregateIntermediateResult)
empty_tuple = ()
def key_at(o: JsonElement, n: str) -> Any:
prop_val = js_value_at(o, n)
if isinstance(prop_val, (dict, list)):
return json.dumps(prop_val)
else:
return prop_val
def value_at(o: JsonElement, n: str) -> int:
prop_val = js_value_at(o, n)
return prop_val if isinstance(prop_val, int) else 0
def from_prop(o: JsonElement) -> Any:
return tuple(key_at(o, n) for n in group_props) # type: ignore
def from_none(_: JsonElement) -> Any:
return empty_tuple
key_getter = from_prop if group_props else from_none
def exec_fn(fn: AggregateFunction, o: JsonElement, r: _AggregateIntermediateResult) -> Any:
value = fn.name if isinstance(fn.name, int) else value_at(o, fn.name)
name = fn.as_name or str(fn.name)
ex_fn_val, ex_fn_count = r.fn_values[name]
if fn.function in ("sum", "avg", "count"):
r.fn_values[name] = (combine_optional(ex_fn_val, value, lambda a, b: a + b), ex_fn_count + 1)
elif fn.function == "min":
if ex_fn_val is None or ex_fn_val > value:
r.fn_values[name] = (value, ex_fn_count + 1)
elif fn.function == "max":
if ex_fn_val is None or ex_fn_val < value:
r.fn_values[name] = (value, ex_fn_count + 1)
else:
raise ValueError(f"Unknown function {fn.function}")
async for element in content:
key = key_getter(element)
current = counter[key]
current.group_count += 1
if fn_props:
for fn in fn_props:
exec_fn(fn, element, current)
return counter
def parse(self, arg: Optional[str] = None, ctx: CLIContext = EmptyContext, **kwargs: Any) -> CLIFlow:
# read aggregate and interpret all variables in the same section as the rest of the command
aggregate = Aggregate(*aggregate_parameter_parser.parse(arg)).change_variable(ctx.variable_in_section)
var_names: List[str] = [n for gv in aggregate.group_by for n in gv.all_names()]
# lookup the function by result name
fn_by_name = {fn.as_name or str(fn.name): fn.function for fn in aggregate.group_func}
def group(keys: tuple[Any, ...]) -> Json:
lookup = dict(zip(var_names or [], keys))
result = {}
for av in aggregate.group_by:
if isinstance(av.name, AggregateVariableCombined):
value = ""
for p in av.name.parts:
if isinstance(p, AggregateVariableName):
v = lookup[p.name]
value += str(v) if v is not None else "null"
else:
value += p
else:
value = lookup[av.name.name]
result[av.as_name or ".".join(av.all_names())] = value
return result
async def aggregate_data(content: JsStream) -> AsyncIterator[JsonElement]:
async with content.stream() as in_stream:
for key, value in (await self.aggregate_in(in_stream, var_names, aggregate.group_func)).items():
entry: Json = {"group": group(key)}
for fn_name, (fn_val, fn_count) in value.fn_values.items():
if fn_by_name.get(fn_name) == "avg" and fn_val is not None and fn_count > 0:
fn_val = fn_val / fn_count # type: ignore
entry[fn_name] = fn_val
yield entry
# noinspection PyTypeChecker
return CLIFlow(aggregate_data)
class HeadCommand(SearchCLIPart):
"""
```shell
head [-num]
```
Take [num] number of elements from the input stream and send them downstream.
The rest of the stream is discarded.
Note: using a search, the same result can be achieved using `sort` and `limit`.
## Options
- `-num` [optional, defaults to 100]: the number of elements to take from the head.
## Examples
```shell
# Json array with 5 elements is defined. We only take the first 2 elements.
> json [1,2,3,4,5] | head -2
1
2
# A search is performed to select all volumes. Only the first 2 results are taken.
> search is(volume) | head -2
kind=gcp_disk, id=12, name=gke-1, age=5mo26d, cloud=gcp, account=eng, region=us-central1, zone=us-central1-c
kind=gcp_disk, id=34, name=pvc-2, age=4mo16d, cloud=gcp, account=dev, region=us-west1, zone=us-west1-a
```
## Related
- `tail` - take the last number of elements.
"""
@property
def name(self) -> str:
return "head"
def info(self) -> str:
return "Return n first elements of the stream."
def parse(self, arg: Optional[str] = None, ctx: CLIContext = EmptyContext, **kwargs: Any) -> CLIAction:
size = self.parse_size(arg)
return CLIFlow(lambda in_stream: stream.take(in_stream, size))
def args_info(self) -> ArgsInfo:
return [ArgInfo(expects_value=True, help_text="number of elements to take")]
@staticmethod
def parse_size(arg: Optional[str]) -> int:
return abs(int(arg)) if arg else 100