-
Notifications
You must be signed in to change notification settings - Fork 81
/
command_test.py
1454 lines (1220 loc) · 62.7 KB
/
command_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import asyncio
import json
import logging
import os
import sqlite3
from datetime import timedelta
from functools import partial
from typing import List, Dict, Optional, Any, Tuple, Type, TypeVar, cast, Callable, Set
import pytest
import yaml
from _pytest.logging import LogCaptureFixture
from aiohttp import ClientTimeout
from aiohttp.web import Request
from aiostream import stream, pipe
from attrs import evolve
from pytest import fixture
from resotocore import version
from resotocore.cli import is_node, JsStream, list_sink
from resotocore.cli.cli import CLIService
from resotocore.cli.command import HttpCommand, JqCommand, AggregateCommand, all_commands
from resotocore.cli.model import CLIContext, WorkerCustomCommand, CLI, FilePath
from resotocore.cli.tip_of_the_day import generic_tips
from resotocore.console_renderer import ConsoleRenderer, ConsoleColorSystem
from resotocore.db.graphdb import ArangoGraphDB
from resotocore.db.jobdb import JobDb
from resotocore.dependencies import TenantDependencies
from resotocore.error import CLIParseError
from resotocore.graph_manager.graph_manager import GraphManager
from resotocore.ids import InfraAppName, GraphName
from resotocore.infra_apps.package_manager import PackageManager
from resotocore.infra_apps.runtime import Runtime
from resotocore.model.model import Model
from resotocore.model.typed_model import to_js
from resotocore.query.model import Template, Query
from resotocore.report import Inspector
from resotocore.task.task_description import TimeTrigger, Workflow, EventTrigger
from resotocore.task.task_handler import TaskHandlerService
from resotocore.types import JsonElement, Json
from resotocore.user import UsersConfigId
from resotocore.util import AccessJson, utc_str, utc
from resotocore.worker_task_queue import WorkerTask
from tests.resotocore.util_test import not_in_path
@fixture
def json_source() -> str:
nums = ",".join([f'{{ "num": {a}, "inner": {{"num": {a%10}}}}}' for a in range(0, 100)])
return "json [" + nums + "," + nums + "]"
def test_known_category(dependencies: TenantDependencies) -> None:
allowed_categories = {"search", "format", "action", "setup", "misc"}
for cmd in all_commands(dependencies):
assert cmd.category in allowed_categories, f"Unknown category {cmd.category} for command {cmd.name}"
@pytest.mark.asyncio
async def test_echo_source(cli: CLI) -> None:
# no arg passed to json
result = await cli.execute_cli_command("echo", list_sink)
assert result[0] == [""]
# simple string passed to json
result = await cli.execute_cli_command("echo this is a string", list_sink)
assert result[0] == ["this is a string"]
result = await cli.execute_cli_command('echo "foo bla bar" ', list_sink)
assert result[0] == ["foo bla bar"]
@pytest.mark.asyncio
async def test_json_source(cli: CLI) -> None:
# json object passed to json
result = await cli.execute_cli_command('json {"a": 1}', list_sink)
assert result[0] == [{"a": 1}]
# json array passed to json
result = await cli.execute_cli_command('json [{"a": 1}, {"b":2}]', list_sink)
assert result[0] == [{"a": 1}, {"b": 2}]
# json string passed to json
result = await cli.execute_cli_command('json "foo bla bar"', list_sink)
assert result[0] == ["foo bla bar"]
@pytest.mark.asyncio
async def test_predecessors(cli: CLI) -> None:
r1 = await cli.execute_cli_command("search id(4_0) | predecessors", list_sink)
assert len(r1[0]) == 1
r2 = await cli.execute_cli_command("search id(4_0) | predecessors --with-origin", list_sink)
assert len(r2[0]) == 2
r3 = await cli.execute_cli_command("search id(4_0) | predecessors --with-origin default", list_sink)
assert len(r3[0]) == 2
r4 = await cli.execute_cli_command("search id(4_0) | predecessors delete", list_sink)
assert len(r4[0]) == 0
@pytest.mark.asyncio
async def test_ancestors(cli: CLI) -> None:
r1 = await cli.execute_cli_command("search id(4_0) | ancestors", list_sink)
assert len(r1[0]) == 4
r2 = await cli.execute_cli_command("search id(4_0) | ancestors --with-origin", list_sink)
assert len(r2[0]) == 5
r3 = await cli.execute_cli_command("search id(4_0) | ancestors --with-origin default", list_sink)
assert len(r3[0]) == 5
r4 = await cli.execute_cli_command("search id(4_0) | ancestors delete", list_sink)
assert len(r4[0]) == 0
@pytest.mark.asyncio
async def test_successors(cli: CLI) -> None:
r1 = await cli.execute_cli_command("search id(4) | successors", list_sink)
assert len(r1[0]) == 10
r2 = await cli.execute_cli_command("search id(4) | successors --with-origin", list_sink)
assert len(r2[0]) == 11
r3 = await cli.execute_cli_command("search id(4) | successors --with-origin default", list_sink)
assert len(r3[0]) == 11
r4 = await cli.execute_cli_command("search id(4) | successors delete", list_sink)
assert len(r4[0]) == 0
@pytest.mark.asyncio
async def test_descendants(cli: CLI) -> None:
r1 = await cli.execute_cli_command("search id(4) | descendants", list_sink)
assert len(r1[0]) == 10
r2 = await cli.execute_cli_command("search id(4) | descendants --with-origin", list_sink)
assert len(r2[0]) == 11
r3 = await cli.execute_cli_command("search id(4) | descendants --with-origin default", list_sink)
assert len(r3[0]) == 11
r4 = await cli.execute_cli_command("search id(4) | descendants delete", list_sink)
assert len(r4[0]) == 0
@pytest.mark.asyncio
async def test_search_source(cli: CLIService) -> None:
result = await cli.execute_cli_command('search is("foo") and some_int==0 --> identifier=~"9_"', list_sink)
assert len(result[0]) == 10
await cli.dependencies.template_expander.put_template(
Template("test", 'is(foo) and some_int==0 --> identifier=~"{{fid}}"')
)
result2 = await cli.execute_cli_command('search expand(test, fid="9_")', list_sink)
assert len(result2[0]) == 10
result3 = await cli.execute_cli_command("search --with-edges is(graph_root) -[0:1]->", list_sink)
# node: graph_root
# node: collector
# edge: graph_root -> collector
# -----------------------------
# = 3 elements
assert len(result3[0]) == 3
result4 = await cli.execute_cli_command("search --explain --with-edges is(graph_root) -[0:1]->", list_sink)
assert result4[0][0]["rating"] == "simple"
# use absolute path syntax
result5 = await cli.execute_cli_command(
"search aggregate(/reported.kind: sum(/reported.some_int) as si): "
"is(foo) and not(/reported.some_int!=0) "
"{child: --> /metadata!=null} some_int==0 "
"with(any, --> /metadata!=null) sort /reported.name asc limit 1",
list_sink,
)
assert result5 == [[{"group": {"kind": "foo"}, "si": 0}]]
@pytest.mark.asyncio
async def test_sleep_source(cli: CLI) -> None:
with pytest.raises(CLIParseError):
await cli.evaluate_cli_command("sleep forever")
result = await cli.execute_cli_command("sleep 0.001; echo hello", list_sink)
assert result == [[""], ["hello"]]
@pytest.mark.asyncio
async def test_count_command(cli: CLI, json_source: str) -> None:
# count instances
result = await cli.execute_cli_command(f"{json_source} | count", list_sink)
assert len(result[0]) == 2
assert result[0] == ["total matched: 200", "total unmatched: 0"]
# count attributes
result = await cli.execute_cli_command(f"{json_source} | count num", list_sink)
assert len(result[0]) == 102
assert result[0][-2] == "total matched: 200"
assert result[0][-1] == "total unmatched: 0"
# count attributes with path
result = await cli.execute_cli_command(f"{json_source} | count inner.num", list_sink)
assert len(result[0]) == 12
assert result[0][-2] == "total matched: 200"
assert result[0][-1] == "total unmatched: 0"
# count unknown attributes
result = await cli.execute_cli_command(f"{json_source} | count does_not_exist", list_sink)
assert len(result[0]) == 2
assert result[0] == ["total matched: 0", "total unmatched: 200"]
@pytest.mark.asyncio
async def test_head_command(cli: CLI) -> None:
assert await cli.execute_cli_command("json [1,2,3,4,5] | head 2 | dump", list_sink) == [[1, 2]]
assert await cli.execute_cli_command("json [1,2,3,4,5] | head -2 | dump", list_sink) == [[1, 2]]
assert await cli.execute_cli_command("json [1,2,3,4,5] | head | dump", list_sink) == [[1, 2, 3, 4, 5]]
@pytest.mark.asyncio
async def test_tail_command(cli: CLI) -> None:
assert await cli.execute_cli_command("json [1,2,3,4,5] | tail 2 | dump", list_sink) == [[4, 5]]
assert await cli.execute_cli_command("json [1,2,3,4,5] | tail -2 | dump", list_sink) == [[4, 5]]
assert await cli.execute_cli_command("json [1,2,3,4,5] | tail | dump", list_sink) == [[1, 2, 3, 4, 5]]
@pytest.mark.asyncio
async def test_chunk_command(cli: CLI, json_source: str) -> None:
result: List[List[str]] = await cli.execute_cli_command(f"{json_source} | chunk 50 | dump", list_sink)
assert len(result[0]) == 4 # 200 in chunks of 50
for a in result[0]:
assert len(a) == 50
@pytest.mark.asyncio
async def test_flatten_command(cli: CLI, json_source: str) -> None:
result = await cli.execute_cli_command(f"{json_source} | chunk 50 | flatten", list_sink)
assert len(result[0]) == 200
@pytest.mark.asyncio
async def test_uniq_command(cli: CLI, json_source: str) -> None:
result = await cli.execute_cli_command(f"{json_source} | uniq", list_sink)
assert len(result[0]) == 100
@pytest.mark.asyncio
async def test_set_desired_command(cli: CLI) -> None:
result = await cli.execute_cli_command('search is("foo") | set_desired a="test" b=1 c=true | dump', list_sink)
assert len(result[0]) == 10
for elem in result[0]:
assert {"a": "test", "b": 1, "c": True}.items() <= elem["desired"].items()
@pytest.mark.asyncio
async def test_set_metadata_command(cli: CLI) -> None:
result = await cli.execute_cli_command('search is("foo") | set_metadata a="test" b=1 c=true | dump', list_sink)
assert len(result[0]) == 10
for elem in result[0]:
assert {"a": "test", "b": 1, "c": True}.items() <= elem["metadata"].items()
@pytest.mark.asyncio
async def test_clean_command(cli: CLI) -> None:
result = await cli.execute_cli_command('search is("foo") | clean | dump', list_sink)
assert len(result[0]) == 10
for elem in result[0]:
assert {"clean": True}.items() <= elem["desired"].items()
@pytest.mark.asyncio
async def test_protect_command(cli: CLI) -> None:
result = await cli.execute_cli_command('search is("foo") | protect | dump', list_sink)
assert len(result[0]) == 10
for elem in result[0]:
assert {"protected": True}.items() <= elem["metadata"].items()
@pytest.mark.asyncio
async def test_list_sink(cli: CLI, dependencies: TenantDependencies) -> None:
result = await cli.execute_cli_command("json [1,2,3] | dump", list_sink)
assert result == [[1, 2, 3]]
@pytest.mark.asyncio
async def test_flat_sink(cli: CLI) -> None:
parsed = await cli.evaluate_cli_command("json [1,2,3] | dump; json [4,5,6] | dump; json [7,8,9] | dump")
expected = [1, 2, 3, 4, 5, 6, 7, 8, 9]
assert await stream.list(stream.iterate((await p.execute())[1] for p in parsed) | pipe.concat()) == expected
@pytest.mark.asyncio
async def test_format(cli: CLI) -> None:
# access properties by name and path
result = await cli.execute_cli_command(
'json {"a":"b", "b": {"c":"d"}} | format a:{a} b:{b.c} na:{fuerty}', list_sink
)
assert result[0] == ["a:b b:d na:null"]
# use correct type
props = dict(a="a", b=True, c=False, d=None, e=12, f=1.234)
result = await cli.execute_cli_command(f"json {json.dumps(props)}" " | format {a}:{b}:{c}:{d}:{e}:{f}", list_sink)
assert result[0] == ["a:true:false:null:12:1.234"]
# access deeply nested properties with dict and array
result = await cli.execute_cli_command(
'json {"a":{"b":{"c":{"d":[0,1,2, {"e":"f"}]}}}} | format will be an >{a.b.c.d[3].e}<', list_sink
)
assert result[0] == ["will be an >f<"]
# make sure any path that is not available leads to the null value
result = await cli.execute_cli_command("json {} | format {a}:{b.c.d}:{foo.bla[23].test}", list_sink)
assert result[0] == ["null:null:null"]
# Queries that use the reported section, also interpret the format in the reported section
result = await cli.execute_cli_command(
"search id(sub_root) limit 1 | format {{aa}} {some_string} test}} {some_int} {/metadata.node_id} {{",
list_sink,
)
assert result[0] == ["{aa} hello test} 0 sub_root {"]
@pytest.mark.asyncio
async def test_workflows_command(cli: CLIService, task_handler: TaskHandlerService, test_workflow: Workflow) -> None:
async def execute(cmd: str) -> List[JsonElement]:
ctx = CLIContext(cli.cli_env)
return (await cli.execute_cli_command(cmd, list_sink, ctx))[0] # type: ignore
assert await execute("workflows list") == ["sleep_workflow", "wait_for_collect_done", "test_workflow"]
assert await execute("workflows show test_workflow") == [to_js(test_workflow)]
wf = await execute("workflows run test_workflow")
assert wf[0].startswith("Workflow test_workflow started with id") # type: ignore
running = await execute("workflows running")
assert len(running) == 1
# executing an already running workflow will give a specific message
await execute("workflows run sleep_workflow")
sf = await execute("workflows run sleep_workflow")
assert sf[0].startswith("Workflow sleep_workflow already running with id ") # type: ignore
# a workflow task can be stopped
task_id = running[0]["task-id"] # type: ignore
af = await execute(f"workflows stop {task_id}")
assert af[0] == f"Workflow Task {task_id} stopped."
# make sure to wait for all tasks to finish
for rt in await task_handler.running_tasks():
await task_handler.delete_running_task(rt)
# access the history of all workflows
history = AccessJson.wrap_list(await execute("workflows history"))
assert len(history) == 1
assert history[0].sleep_workflow.count == 1
assert history[0].test_workflow.count == 1
# access the history of a specific workflow
history_test = AccessJson.wrap_list(await execute("workflows history test_workflow"))
assert len(history_test) == 1
wf_run = history_test[0]
assert all(n in wf_run for n in ["id", "task_started_at", "duration"])
# access the log of a specific workflow run
task_log = await execute(f"workflows log {wf_run['id']}")
assert len(task_log) == 1
@pytest.mark.asyncio
async def test_jobs_command(cli: CLIService, task_handler: TaskHandlerService, job_db: JobDb) -> None:
async def execute(cmd: str) -> List[List[JsonElement]]:
ctx = CLIContext(cli.cli_env)
return await cli.execute_cli_command(cmd, list_sink, ctx)
# add job with schedule
result = await execute('jobs add --id hello --schedule "23 1 * * *" echo Hello World @NOW@')
assert result == [["Job hello added."]]
job = await job_db.get("hello")
assert job is not None
assert job.command.command == "echo Hello World @NOW@"
assert job.trigger == TimeTrigger("23 1 * * *")
assert job.wait is None
assert job in task_handler.task_descriptions
assert job.environment == {"graph": "ns", "section": "reported"}
# add job with schedule and event
with_event = await execute('jobs add --id timed_hi --schedule "23 1 * * *" --wait-for-event foo echo Hello World')
assert with_event == [["Job timed_hi added."]]
job_with_event: Job = await job_db.get("timed_hi") # type: ignore
assert job_with_event.wait is not None
event_trigger, timeout = job_with_event.wait
assert event_trigger.message_type == "foo"
assert timeout == timedelta(hours=1)
assert job_with_event.environment == {"graph": "ns", "section": "reported"}
assert job_with_event in task_handler.task_descriptions
# add job with event
only_event = await execute("jobs add --id only_event --wait-for-event foo echo Hello World")
assert only_event == [["Job only_event added."]]
job_only_event: Job = await job_db.get("only_event") # type: ignore
assert job_only_event.trigger == EventTrigger("foo")
assert job_only_event.wait is None
assert job_only_event.environment == {"graph": "ns", "section": "reported"}
assert job_only_event in task_handler.task_descriptions
# add job without any trigger
no_trigger = await execute("jobs add --id no_trigger echo Hello World")
assert no_trigger == [["Job no_trigger added."]]
job_no_trigger: Job = await job_db.get("no_trigger") # type: ignore
assert job_no_trigger.wait is None
assert job_no_trigger.environment == {"graph": "ns", "section": "reported"}
assert job_no_trigger in task_handler.task_descriptions
# deactivate timed_hi
deactivated = await execute("jobs deactivate timed_hi")
assert deactivated[0][0]["active"] is False # type: ignore
# activate timed_hi
activated = await execute("jobs activate timed_hi")
assert activated[0][0]["active"] is True # type: ignore
# show specific job
no_trigger_show = await execute("jobs show no_trigger")
assert len(no_trigger_show[0]) == 1
# show all jobs
all_jobs = await execute("jobs list")
assert len(all_jobs[0]) == 4
# start the job
run_hello = await execute("jobs run timed_hi")
assert run_hello[0][0].startswith("Job timed_hi started with id") # type: ignore
assert [t for t in await task_handler.running_tasks() if t.descriptor.id == "timed_hi"]
# list all running jobs
all_running = await execute("jobs running")
assert [r["job"] for r in all_running[0]] == ["timed_hi"] # type: ignore
# delete a job
deleted = await execute("jobs delete timed_hi")
assert deleted == [["Job timed_hi deleted."]]
@pytest.mark.asyncio
async def test_tag_command(
cli: CLIService, performed_by: Dict[str, List[str]], incoming_tasks: List[WorkerTask], caplog: LogCaptureFixture
) -> None:
counter = 0
def nr_of_performed() -> int:
nonlocal counter
performed = len(performed_by)
increase = performed - counter
counter = performed
return increase
nr_of_performed() # reset to 0
assert await cli.execute_cli_command("echo id_does_not_exist | tag update foo bla", list_sink) == [[]]
assert nr_of_performed() == 0
res1 = await cli.execute_cli_command(
'json ["root", "collector"] | tag update foo "bla_{reported.some_int}" | dump', list_sink
)
assert nr_of_performed() == 2
assert {a["id"] for a in res1[0]} == {"root", "collector"}
assert len(incoming_tasks) == 2
# check that the worker task data is correct
data = AccessJson(incoming_tasks[0].data)
assert data["update"] is not None # tag update -> data.update is defined
assert not data.node.reported.is_none # the node reported section is defined
assert not data.node.metadata.is_none # the node metadata section is defined
assert not data.node.ancestors.cloud.reported.is_none # the ancestors cloud section is defineda
assert data["update"].foo == "bla_0" # using the renderer bla_{reported.some_int}
res2 = await cli.execute_cli_command('search is("foo") | tag update foo bla', list_sink)
assert nr_of_performed() == 10
assert len(res2[0]) == 10
res2_tag_no_val = await cli.execute_cli_command('search is("foo") | tag update foobar', list_sink)
assert nr_of_performed() == 10
assert len(res2_tag_no_val[0]) == 10
res3 = await cli.execute_cli_command('search is("foo") | tag delete foo', list_sink)
assert nr_of_performed() == 10
assert len(res3[0]) == 10
with caplog.at_level(logging.WARNING):
caplog.clear()
res4 = await cli.execute_cli_command('search is("bla") limit 2 | tag delete foo', list_sink)
assert nr_of_performed() == 2
assert len(res4[0]) == 2
# make sure that 2 warnings are emitted
assert len(caplog.records) == 2
for res in caplog.records:
assert res.message.startswith("Update not reflected in db. Wait until next collector run.")
# tag updates can be put into background
res6 = await cli.execute_cli_command('json ["root", "collector"] | tag update --nowait foo bla', list_sink)
assert cli.dependencies.forked_tasks.qsize() == 2
for res in res6[0]:
# in this case a message with the task id is emitted
assert res.startswith("Spawned WorkerTask tag:") # type:ignore
# and the real result is found when the forked task is awaited, which happens by the CLI reaper
awaitable, info = await cli.dependencies.forked_tasks.get()
assert (await awaitable)["id"] in ["root", "collector"] # type:ignore
@pytest.mark.asyncio
async def test_kinds_command(cli: CLI, foo_model: Model) -> None:
result = await cli.execute_cli_command("kind", list_sink)
for kind in ["account", "bla", "child", "cloud", "parent", "region", "some_complex"]:
assert kind in result[0]
result = await cli.execute_cli_command("kind foo", list_sink)
assert result[0][0] == {
"name": "foo",
"bases": ["base"],
"properties": {
"age": "duration",
"ctime": "datetime",
"identifier": "string",
"kind": "string",
"name": "string",
"now_is": "datetime",
"some_int": "int32",
"some_string": "string",
},
"successors": ["bla"],
}
result = await cli.execute_cli_command("kind string", list_sink)
assert result[0][0] == {"name": "string", "runtime_kind": "string"}
result = await cli.execute_cli_command("kind -p reported.ctime", list_sink)
assert result[0][0] == {
"name": "datetime",
"runtime_kind": "datetime",
"appears_in": [
"base",
"foo",
"bla",
"cloud",
"account",
"region",
"parent",
"child",
"some_complex",
"predefined_properties",
],
}
with pytest.raises(Exception):
await cli.execute_cli_command("kind foo bla bar", list_sink)
@pytest.mark.asyncio
async def test_sort_command(cli: CLI) -> None:
async def identifiers(query: str) -> List[str]:
result = await cli.execute_cli_command(query + " | dump", list_sink)
return [r["reported"]["identifier"] for r in result[0]]
id_wo = await identifiers("search is(bla) | sort identifier")
id_asc = await identifiers("search is(bla) | sort identifier asc")
id_desc = await identifiers("search is(bla) | sort identifier desc")
id_kind = await identifiers("search is(bla) | sort identifier | sort kind")
assert id_wo == id_asc
assert id_wo == id_kind
assert id_asc == list(reversed(id_desc))
@pytest.mark.asyncio
async def test_limit_command(cli: CLI) -> None:
async def identifiers(query: str) -> List[str]:
result = await cli.execute_cli_command(query + " | dump", list_sink)
return [r["reported"]["identifier"] for r in result[0]]
assert await identifiers("search is(bla) sort identifier | limit 1") == ["0_0"]
assert await identifiers("search is(bla) sort identifier | limit 2") == ["0_0", "0_1"]
assert await identifiers("search is(bla) sort identifier | limit 2, 2") == ["0_2", "0_3"]
assert await identifiers("search is(bla) sort identifier | limit 10, 2") == ["1_0", "1_1"]
assert await identifiers("search is(bla) sort identifier | limit 100, 2") == []
@pytest.mark.asyncio
async def test_list_command(cli: CLI) -> None:
result = await cli.execute_cli_command('search is (foo) and identifier=="4" sort some_int | list', list_sink)
assert len(result[0]) == 1
assert result[0][0].startswith("kind=foo, identifier=4, some_int=0, age=")
list_cmd = "list some_int as si, some_string"
result = await cli.execute_cli_command(f'search is (foo) and identifier=="4" | {list_cmd}', list_sink)
assert result[0] == ["si=0, some_string=hello"]
# list is added automatically when no output renderer is defined and has the same behaviour as if it was given
result = await cli.execute_cli_command('search is (foo) and identifier=="4" sort some_int', list_sink)
assert result[0][0].startswith("kind=foo, identifier=4, some_int=0, age=")
# List is using the correct type
props = dict(id="test", a="a", b=True, c=False, d=None, e=12, f=1.234, reported={})
result = await cli.execute_cli_command(f"json {json.dumps(props)} | list a,b,c,d,e,f", list_sink)
assert result[0] == ["a=a, b=true, c=false, e=12, f=1.234"]
# Queries that use the reported section, also interpret the list format in the reported section
result = await cli.execute_cli_command(
"search id(sub_root) limit 1 | list some_string, some_int, /metadata.node_id", list_sink
)
assert result[0] == ["some_string=hello, some_int=0, node_id=sub_root"]
# List supports csv output
result = await cli.execute_cli_command(
f"json {json.dumps(props)} | list --csv a,`b`,c,`d`,e,`f`,non_existent", list_sink
)
assert result[0] == ['"a","b","c","d","e","f","non_existent"', '"a",True,False,"",12,1.234,""']
# List supports markdown output
result = await cli.execute_cli_command(
f"json {json.dumps(props)} | list --markdown a,b,c,d,e,f,non_existent", list_sink
)
assert result[0] == [
"|a|b |c |d |e |f |non_existent|",
"|-|----|-----|----|--|-----|------------|",
"|a|true|false|null|12|1.234|null |",
]
# List supports markdown output
result = await cli.execute_cli_command(
'json {"id": "foo", "reported":{}, "name": "a", "some_int": 1, "tags": {"foo․bla․bar.test.rest.best.":"yup"}} | list --json-table name, some_int, tags.`foo․bla․bar.test.rest.best.`',
list_sink,
)
assert result[0] == [
{
"columns": [
{"display": "Name", "kind": "string", "name": "name", "path": "/name"},
{"display": "Some Int", "kind": "int32", "name": "some_int", "path": "/some_int"},
{
"display": "Foo․bla․bar.test.rest.best.",
"kind": "string",
"name": "foo․bla․bar.test.rest.best.",
"path": "/tags.`foo․bla․bar.test.rest.best.`",
},
],
},
{"id": "foo", "row": {"foo․bla․bar.test.rest.best.": "yup", "name": "a", "some_int": 1}},
]
# List supports only markdown or csv, but not both at the same time
with pytest.raises(CLIParseError):
await cli.execute_cli_command(f"json {json.dumps(props)}" " | list --csv --markdown", list_sink)
# List command will make sure to make the column name unique
props = dict(id="123", reported=props, ancestors={"account": {"reported": props}})
result = await cli.execute_cli_command(
f"json {json.dumps(props)} | list reported.a, reported.b as a, reported.c as a, reported.c, "
f"ancestors.account.reported.a, ancestors.account.reported.a, ancestors.account.reported.a as foo",
list_sink,
)
# b as a ==> b, c as a ==> c, c ==> c_1, ancestors.account.reported.a ==> account_a, again ==> _1
assert result[0][0] == "a=a, b=true, c=false, c_1=false, account_a=a, account_a_1=a, foo=a"
# source context is passed correctly
parsed = await cli.evaluate_cli_command("search is (bla) | head 10 | list")
src_ctx, gen = await parsed[0].execute()
assert src_ctx.count == 10
assert src_ctx.total_count == 100
@pytest.mark.asyncio
async def test_jq_command(cli: CLI) -> None:
ctx = CLIContext(env={"section": "reported"}, query=Query.by("test"))
# .test -> .reported.test
assert JqCommand.rewrite_props(".a,.b", ctx) == ".reported.a,.reported.b"
# absolute paths are rewritten correctly
assert JqCommand.rewrite_props("./reported", ctx) == ".reported"
# object construction is supported
assert JqCommand.rewrite_props("{a:.a, b:.b}", ctx) == "{a:.reported.a, b:.reported.b}"
# no replacement after pipe
assert JqCommand.rewrite_props("map(.color) | {a:.a, b:.b}", ctx) == "map(.reported.color) | {a:.a, b:.b}"
assert (
JqCommand.rewrite_props(".pod_status.container_statuses[].image_id", ctx)
== ".reported.pod_status.container_statuses[].image_id"
)
result = await cli.execute_cli_command('json {"a":{"b":1}} | jq ".a.b"', list_sink)
assert len(result[0]) == 1
assert result[0][0] == 1
# allow absolute paths as json path
result = await cli.execute_cli_command('json {"id":"123", "reported":{"b":1}} | jq "./reported"', list_sink)
assert result == [[{"b": 1}]]
# jq .kind is rewritten as .reported.kind
result = await cli.execute_cli_command("search is(foo) limit 2 | jq .kind", list_sink)
assert result[0] == ["foo", "foo"]
@pytest.mark.asyncio
async def test_execute_search_command(cli: CLI) -> None:
# regression test: this used to fail because the arg could not be parsed
await cli.execute_cli_command('execute_search (b= "0")', list_sink)
@pytest.mark.asyncio
async def test_aggregation_to_count_command(cli: CLI) -> None:
r = await cli.execute_cli_command("search all | count kind", list_sink)
assert set(r[0]) == {
"graph_root: 1",
"cloud: 1",
"account: 1",
"foo: 10",
"bla: 100",
"total matched: 113",
"total unmatched: 0",
}
# exactly the same command as above (above search would be rewritten as this)
r = await cli.execute_cli_command(
"execute_search aggregate(reported.kind as name: sum(1) as count):all sort count asc | aggregate_to_count",
list_sink,
)
assert set(r[0]) == {
"graph_root: 1",
"cloud: 1",
"account: 1",
"foo: 10",
"bla: 100",
"total matched: 113",
"total unmatched: 0",
}
@pytest.mark.skipif(not_in_path("arangodump"), reason="requires arangodump to be in path")
@pytest.mark.asyncio
async def test_system_backup_command(cli: CLI) -> None:
async def check_backup(res: JsStream) -> None:
async with res.stream() as streamer:
only_one = True
async for s in streamer:
path = FilePath.from_path(s)
assert path.local.exists()
# backup should have size between 30k and 1500k (adjust size if necessary)
assert 30000 < path.local.stat().st_size < 1500000
assert only_one
only_one = False
await cli.execute_cli_command("system backup create", check_backup)
@pytest.mark.asyncio
async def test_system_info_command(cli: CLI) -> None:
info = AccessJson.wrap_object((await cli.execute_cli_command("system info", list_sink))[0][0])
assert info.version == version()
assert info.name == "resotocore"
assert info.cpus > 0
@pytest.mark.skipif(not_in_path("arangodump", "arangorestore"), reason="requires arangodump and arangorestore")
@pytest.mark.asyncio
async def test_system_restore_command(cli: CLI, tmp_directory: str) -> None:
backup = os.path.join(tmp_directory, "backup")
async def move_backup(res: JsStream) -> None:
async with res.stream() as streamer:
async for s in streamer:
path = FilePath.from_path(s)
os.rename(path.local, backup)
await cli.execute_cli_command("system backup create", move_backup)
ctx = CLIContext(uploaded_files={"backup": backup})
restore = await cli.execute_cli_command(f"BACKUP_NO_SYS_EXIT=true system backup restore {backup}", list_sink, ctx)
assert restore == [
[
"Database has been restored successfully!",
"Since all data has changed in the database eventually, this service needs to be restarted!",
]
]
@pytest.mark.asyncio
async def test_configs_command(cli: CLI, tmp_directory: str) -> None:
config_file = os.path.join(tmp_directory, "config.yml")
async def check_file_is_yaml(res: JsStream) -> None:
async with res.stream() as streamer:
async for s in streamer:
assert isinstance(s, str)
with open(s, "r") as file:
yaml.safe_load(file.read())
# create a new config entry
create_result = await cli.execute_cli_command("configs set test_config t1=1, t2=2, t3=3 ", list_sink)
assert create_result[0][0] == "t1: 1\nt2: 2\nt3: 3\n"
# show the entry - should be the same as the created one
show_result = await cli.execute_cli_command("configs show test_config", list_sink)
assert show_result[0][0] == "t1: 1\nt2: 2\nt3: 3\n"
# list all configs: only one is defined
list_result = await cli.execute_cli_command("configs list", list_sink)
assert list_result[0] == ["test_config"]
# copy the config
await cli.execute_cli_command("configs copy test_config test_config_copy", list_sink)
list_result = await cli.execute_cli_command("configs list", list_sink)
assert list_result[0] == ["test_config", "test_config_copy"]
# edit the config: will make the config available as file
await cli.execute_cli_command("configs edit test_config", check_file_is_yaml)
# update the config
update_doc = "a: '1'\nb: 2\nc: true\nd: null\n"
with open(config_file, "w") as file:
file.write(update_doc)
ctx = CLIContext(uploaded_files={"config.yaml": config_file})
update_result = await cli.execute_cli_command(f"configs update test_config {config_file}", list_sink, ctx)
assert update_result == [[]]
# show the entry - should be the same as the created one
show_updated_result = await cli.execute_cli_command("configs show test_config", list_sink)
assert show_updated_result[0][0] == update_doc
# write a env var substitution to the config
env_var_update = "foo: $(FOO)\n"
with open(config_file, "w") as file:
file.write(env_var_update)
ctx = CLIContext(uploaded_files={"config.yaml": config_file})
update_result = await cli.execute_cli_command(f"configs update test_config {config_file}", list_sink, ctx)
# provide the env var
os.environ["FOO"] = "bar"
# check the configs: the env var should stay here and not be resolved when the user views the config
show_updated_result = await cli.execute_cli_command("configs show test_config", list_sink)
assert show_updated_result[0][0] == env_var_update
@pytest.mark.asyncio
async def test_templates_command(cli: CLI) -> None:
result = await cli.execute_cli_command("templates test kind=volume is({{kind}})", list_sink)
assert result == [["is(volume)"]]
result = await cli.execute_cli_command("templates add filter_kind is({{kind}})", list_sink)
assert result == [["Template filter_kind added to the search library.\nis({{kind}})"]]
result = await cli.execute_cli_command("templates", list_sink)
assert result == [["filter_kind: is({{kind}})"]]
result = await cli.execute_cli_command("templates filter_kind", list_sink)
assert result == [["is({{kind}})"]]
result = await cli.execute_cli_command("templates delete filter_kind", list_sink)
assert result == [["Template filter_kind deleted from the search library."]]
@pytest.mark.asyncio
async def test_write_command(cli: CLI) -> None:
async def check_file(res: JsStream, check_content: Optional[str] = None) -> None:
async with res.stream() as streamer:
only_one = True
async for s in streamer:
fp = FilePath.from_path(s)
assert fp.local.exists() and fp.local.is_file()
assert 1 < fp.local.stat().st_size < 100000
assert fp.user.name.startswith("write_test")
assert only_one
only_one = False
if check_content:
with open(fp.local, "r") as file:
data = file.read()
assert data == check_content
# result can be read as json
await cli.execute_cli_command("search all limit 3 | format --json | write write_test.json ", check_file)
# result can be read as yaml
await cli.execute_cli_command("search all limit 3 | format --yaml | write write_test.yaml ", check_file)
# throw an exception
with pytest.raises(Exception):
await cli.execute_cli_command("echo hello | write", list_sink) # missing filename
# write enforces unescaped output.
env = {"now": utc_str()} # fix the time, so that replacements will stay equal
truecolor = CLIContext(console_renderer=ConsoleRenderer(80, 25, ConsoleColorSystem.truecolor, True), env=env)
monochrome = CLIContext(console_renderer=ConsoleRenderer.default_renderer(), env=env)
# Make sure, that the truecolor output is different from monochrome output
mono_out = await cli.execute_cli_command("help", list_sink, monochrome)
assert await cli.execute_cli_command("help", list_sink, truecolor) != mono_out
# We expect the content of the written file to contain monochrome output.
assert await cli.execute_cli_command(
"help | write write_test.txt", partial(check_file, check_content="".join(mono_out[0]) + "\n"), truecolor
)
@pytest.mark.asyncio
async def test_http_command(cli: CLI, echo_http_server: Tuple[int, List[Tuple[Request, Json]]]) -> None:
port, requests = echo_http_server
def test_arg(
arg_str: str,
method: Optional[str] = None,
url: Optional[str] = None,
headers: Optional[Dict[str, str]] = None,
params: Optional[Dict[str, str]] = None,
timeout: Optional[ClientTimeout] = None,
compress: Optional[bool] = None,
) -> None:
def test_if_set(prop: Any, value: Any) -> None:
if prop is not None:
assert prop == value, f"{prop} is not {value}"
arg = HttpCommand.parse_args("https", arg_str)
test_if_set(method, arg.method)
test_if_set(url, arg.url)
test_if_set(headers, arg.headers)
test_if_set(params, arg.params)
test_if_set(compress, arg.compress)
test_if_set(timeout, arg.timeout)
test_arg(":123", "POST", "https://localhost:123", {}, {}, ClientTimeout(30), False)
test_arg("GET :123", "GET", "https://localhost:123")
test_arg("://foo:123", "POST", "https://foo:123")
test_arg("foo:123/bla", "POST", "https://foo:123/bla")
test_arg("foo:123/bla", "POST", "https://foo:123/bla")
test_arg("foo/bla", "POST", "https://foo/bla")
test_arg(
'--compress --timeout 24 POST :123 "hdr1: test" qp==123 hdr2:fest "qp2 == 321"',
headers={"hdr1": "test", "hdr2": "fest"},
params={"qp": "123", "qp2": "321"},
compress=True,
timeout=ClientTimeout(24),
)
# take 3 instance of type bla and send it to the echo server
result = await cli.execute_cli_command(f"search is(bla) limit 3 | http :{port}/test", list_sink)
# one line is returned to the user with a summary of the response types.
assert result == [["3 requests with status 200 sent."]]
# make sure all 3 requests have been received - the body is the complete json node
assert len(requests) == 3
for ar in (AccessJson(content) for _, content in requests):
assert is_node(ar)
assert ar.reported.kind == "bla"
# failing requests are retried
requests.clear()
await cli.execute_cli_command(f"search is(bla) limit 1 | http --backoff-base 0.001 :{port}/fail", list_sink)
# 1 request + 3 retries => 4 requests
assert len(requests) == 4
@pytest.mark.asyncio
async def test_jira_alias(cli: CLI, echo_http_server: Tuple[int, List[Tuple[Request, Json]]]) -> None:
port, requests = echo_http_server
result = await cli.execute_cli_command(
f'search is(bla) | jira --url "http://localhost:{port}/success" --title test --message "test message" --username test --token test --project_id 10000 --reporter_id test',
list_sink,
)
assert result == [["1 requests with status 200 sent."]]
assert len(requests) == 1
print(requests[0][1])
assert requests[0][1] == {
"fields": {
"summary": "test",
"issuetype": {"id": "10001"},
"project": {"id": "10000"},
"description": "test message\n\nbla: yes or no\nbla: yes or no\nbla: yes or no\nbla: yes or no\nbla: yes or no\nbla: yes or no\nbla: yes or no\nbla: yes or no\nbla: yes or no\nbla: yes or no\nbla: yes or no\nbla: yes or no\nbla: yes or no\nbla: yes or no\nbla: yes or no\nbla: yes or no\nbla: yes or no\nbla: yes or no\nbla: yes or no\nbla: yes or no\nbla: yes or no\nbla: yes or no\nbla: yes or no\nbla: yes or no\nbla: yes or no\n... (results truncated)\n\nIssue created by Resoto",
"reporter": {"id": "test"},
"labels": ["created-by-resoto"],
}
}
@pytest.mark.asyncio
async def test_pagerduty_alias(cli: CLI, echo_http_server: Tuple[int, List[Tuple[Request, Json]]]) -> None:
port, requests = echo_http_server
result = await cli.execute_cli_command(
f'search id(0_0) | pagerduty --webhook-url "http://localhost:{port}/success" --summary test --routing-key 123 --dedup-key 234',
list_sink,
)
assert result == [["1 requests with status 200 sent."]]
assert len(requests) == 1
response = requests[0][1]
# override timestamp
assert response["payload"]["timestamp"] is not None
response["payload"]["timestamp"] = "2023-02-10T15:03:33Z"
assert requests[0][1] == {
"payload": {
"summary": "test",
"timestamp": "2023-02-10T15:03:33Z",
"source": "Resoto",
"severity": "warning",
"component": "Resoto",
"custom_details": {
"collector": {"sub_root": {"no-region": {"0_0": {"id": None, "name": "yes or no", "kind": "bla"}}}}
},
},
"routing_key": "123",
"dedup_key": "234",
"images": [
{
"src": "https://cdn.some.engineering/assets/resoto-illustrations/small/resoto-alert.png",
"href": "https://resoto.com/",
"alt": "Resoto Home Page",
}
],
"links": [],
"event_action": "trigger",
"client": "Resoto Service",
"client_url": "https://resoto.com",
}
@pytest.mark.asyncio
async def test_welcome(cli: CLI) -> None:
ctx = CLIContext(console_renderer=ConsoleRenderer.default_renderer())
result = await cli.execute_cli_command(f"welcome", list_sink, ctx)
assert "Resoto" in result[0][0]
@pytest.mark.asyncio
async def test_tip_of_the_day(cli: CLI) -> None:
ctx = CLIContext(console_renderer=ConsoleRenderer.default_renderer())
result = await cli.execute_cli_command(f"totd", list_sink, ctx)
assert generic_tips[0].command_line in result[0][0]
@pytest.mark.asyncio
async def test_certificate(cli: CLI) -> None:
result = await cli.execute_cli_command(
f"certificate create --common-name foo.resoto.com --dns-names bla --ip-addresses 1.2.3.4 --days-valid 1",
list_sink,
)
# will create 2 files
assert len(result[0]) == 2
assert [a.rsplit("/")[-1] for a in result[0]] == ["foo.resoto.com.key", "foo.resoto.com.crt"]
@pytest.mark.asyncio
async def test_execute_task(cli: CLI) -> None:
# translate a custom command to an alias template
command = WorkerCustomCommand("name", "info", {"a": "b"}, "description").to_template()
assert command.name == "name"
assert command.info == "info"