-
Notifications
You must be signed in to change notification settings - Fork 1.3k
/
api.py
1117 lines (979 loc) · 51.6 KB
/
api.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""
gds_test_api.py:
This file contains basic asserts that can support integration tests on an FPrime
deployment. This API uses the standard pipeline to get access to commands, events,
telemetry and dictionaries.
:author: koran
"""
import time
import signal
from fprime_gds.common.testing_fw import predicates
from fprime_gds.common.history.test import TestHistory
from fprime_gds.common.logger.test_logger import TestLogger
from fprime_gds.common.utils.event_severity import EventSeverity
from fprime.common.models.serialize.time_type import TimeType
class IntegrationTestAPI:
"""
A value used to begin searches after the current contents in a history and only search future
items
"""
NOW = "NOW"
def __init__(self, pipeline, logpath=None):
"""
Initializes API: constructs and registers test histories.
Args:
pipeline: a pipeline object providing access to basic GDS functionality
logpath: an optional output destination for the api test log
"""
self.pipeline = pipeline
# these are owned by the GDS and will not be modified by the test API.
self.aggregate_command_history = pipeline.get_command_history()
self.aggregate_telemetry_history = pipeline.get_channel_history()
self.aggregate_event_history = pipeline.get_event_history()
# these histories are owned by the TestAPI and are modified by the API.
self.command_history = TestHistory()
self.pipeline.register_command_consumer(self.command_history)
self.telemetry_history = TestHistory()
self.pipeline.register_telemetry_consumer(self.telemetry_history)
self.event_history = TestHistory()
self.pipeline.register_event_consumer(self.event_history)
# Initialize latest time. Will be updated whenever a time query is made.
self.latest_time = TimeType()
# Initialize the logger
if logpath is not None:
self.logger = TestLogger(logpath)
else:
self.logger = None
# A predicate used as a filter to choose which EVR's to log automatically
self.event_log_filter = self.get_event_pred()
self.pipeline.register_event_consumer(self)
# Used by the data_callback method to detect if EVR's have been received out of order.
self.last_evr = None
def teardown(self):
"""
To be called once at the end of the API's use. Closes the test log and clears histories.
"""
self.clear_histories()
if self.logger is not None:
self.logger.close_log()
self.logger = None
######################################################################################
# API Functions
######################################################################################
def start_test_case(self, case_name, case_id):
"""
To be called at the start of a test case. This function inserts a log message to denote a
new test case is beginning, records the latest time stamp in case the user clears the
aggregate histories, and then clears the API's histories.
Args:
case_name: the name of the test case (str)
case_id: a short identifier to denote the test case (str or number)
"""
msg = "[STARTING CASE] {}".format(case_name)
self.__log(msg, TestLogger.GRAY, TestLogger.BOLD, case_id=case_id)
self.get_latest_time() # called in case aggregate histories are cleared by the user
self.clear_histories()
def log(self, msg, color=None):
"""
User-accessible function to log user messages to the test log.
Args:
msg: a user-provided message to add to the test log.
color: a string containing a color hex code "######"
"""
self.__log(msg, color, sender="API user")
def get_latest_time(self):
"""
Finds the latest flight software time received by either history.
Returns:
a flight software timestamp (TimeType)
"""
events = self.aggregate_event_history.retrieve()
e_time = TimeType()
if len(events) > 0:
e_time = events[-1].get_time()
channels = self.aggregate_telemetry_history.retrieve()
t_time = TimeType().useconds
if len(channels) > 0:
t_time = channels[-1].get_time()
self.latest_time = max(e_time, t_time, self.latest_time)
return self.latest_time
def clear_histories(self, time_stamp=None):
"""
Clears the IntegrationTestAPI's histories. Because the command history is not correlated to
a flight software timestamp, it will be cleared entirely. This function can be used to set
up test cases so that the IntegrationTestAPI's histories only contain objects received
during that test.
Note: this will not clear user-created sub-histories nor the aggregate histories (histories
owned by the GDS)
Args:
time_stamp: If specified, histories are only cleared before the timestamp.
"""
if time_stamp is not None:
time_pred = predicates.greater_than_or_equal_to(time_stamp)
e_pred = predicates.event_predicate(time_pred=time_pred)
self.event_history.clear(e_pred)
t_pred = predicates.telemetry_predicate(time_pred=time_pred)
self.telemetry_history.clear(t_pred)
msg = "Clearing Test Histories after {}".format(time_stamp)
self.__log(msg, TestLogger.WHITE)
else:
self.event_history.clear()
self.telemetry_history.clear()
msg = "Clearing Test Histories"
self.__log(msg, TestLogger.WHITE)
self.command_history.clear()
def set_event_log_filter(self, event=None, args=None, severity=None, time_pred=None):
"""
Constructs an event predicate that is then used to filter which EVR's are interlaced in the
test logs. This method replaces the current filter. Calling this method with no arguments
will effectively reset the filter.
Args:
event: an optional mnemonic (str), id (int), or predicate to specify the event type
args: an optional list of arguments (list of values, predicates, or None to ignore)
severity: an EventSeverity enum or a predicate to specify the event severity
time_pred: an optional predicate to specify the flight software timestamp
"""
self.event_log_filter = self.get_event_pred(event, args, severity, time_pred)
def get_command_test_history(self):
"""
Accessor for IntegrationTestAPI's command history
Returns:
a history of CmdData objects
"""
return self.command_history
def get_telemetry_test_history(self):
"""
Accessor for IntegrationTestAPI's telemetry history
Returns:
a history of ChData objects
"""
return self.telemetry_history
def get_event_test_history(self):
"""
Accessor for IntegrationTestAPI's event history
Returns:
a history of EventData objects
"""
return self.event_history
def get_event_subhistory(self, event_filter=None):
"""
Returns a new instance of TestHistory that will be updated with new events as they come in.
Specifying a filter will only enqueue events that satisfy the filter in this new sub-history.
The returned history can be substituted into the await and assert methods of this API.
Args:
event_filter: an optional predicate to filter a subhistory.
Returns:
an instance of TestHistory
"""
subhist = TestHistory(event_filter)
self.pipeline.register_event_consumer(subhist)
return subhist
def remove_event_subhistory(self, subhist):
"""
De-registers the subhistory from the GDS. Once called, the given subhistory will stop
receiving event messages.
Args:
subhist: a TestHistory instance that is subscribed to event messages
Returns:
True if the subhistory was removed, False otherwise
"""
return self.pipeline.remove_event_consumer(subhist)
def get_telemetry_subhistory(self, telemetry_filter=None):
"""
Returns a new instance of TestHistory that will be updated with new telemetry updates as
they come in. Specifying a filter will only enqueue updates that satisfy the filter in this
new sub-history. The returned history can be substituted into the await and assert methods
of this API.
Args:
telemetry_filter: an optional predicate used to filter a subhistory.
Returns:
an instance of TestHistory
"""
hist = TestHistory(telemetry_filter)
self.pipeline.register_telemetry_consumer(hist)
return hist
def remove_telemetry_subhistory(self, subhist):
"""
De-registers the subhistory from the GDS. Once called, the given subhistory will stop
receiving telemetry updates.
Args:
subhist: a TestHistory instance that is subscribed to event messages
Returns:
True if the subhistory was removed, False otherwise
"""
return self.pipeline.remove_telemetry_consumer(subhist)
######################################################################################
# Command Functions
######################################################################################
def translate_command_name(self, command):
"""
This function will translate the given mnemonic into an ID as defined by the flight
software dictionary. This call will raise an error if the command given is not in the
dictionary.
Args:
command: Either the command id (int) or the command mnemonic (str)
Returns:
The comand ID (int)
"""
if isinstance(command, str):
cmd_dict = self.pipeline.get_command_name_dictionary()
if command in cmd_dict:
return cmd_dict[command].get_id()
else:
msg = "The command mnemonic, {}, wasn't in the dictionary".format(command)
raise KeyError(msg)
else:
cmd_dict = self.pipeline.get_command_id_dictionary()
if command in cmd_dict:
return command
else:
msg = "The command id, {}, wasn't in the dictionary".format(command)
raise KeyError(msg)
def send_command(self, command, args=[]):
"""
Sends the specified command.
Args:
command: the mnemonic (str) or ID (int) of the command to send
args: a list of command arguments.
"""
msg = "Sending Command: {} {}".format(command, args)
self.__log(msg, TestLogger.PURPLE)
command = self.translate_command_name(command)
self.pipeline.send_command(command, args)
def send_and_await_telemetry(self, command, args=[], channels=[], timeout=5):
"""
Sends the specified command and awaits the specified channel update or sequence of
updates. See await_telemetry and await_telemetry_sequence for full details.
Note: If awaiting a sequence avoid specifying timestamps.
Args:
command: the mnemonic (str) or ID (int) of the command to send
args: a list of command arguments.
channels: a single or a sequence of channel specs (event_predicates, mnemonics, or IDs)
start: an optional index or predicate to specify the earliest item to search
timeout: the number of seconds to wait before terminating the search (int)
Returns:
The channel update or updates found by the search
"""
start = self.telemetry_history.size()
self.send_command(command, args)
if isinstance(channels, list):
return self.await_telemetry_sequence(channels, start=start, timeout=timeout)
else:
return self.await_telemetry(channels, start=start, timeout=timeout)
def send_and_await_event(self, command, args=[], events=[], timeout=5):
"""
Sends the specified command and awaits the specified event message or sequence of
messages. See await_event and await event sequence for full details.
Note: If awaiting a sequence avoid specifying timestamps.
Args:
command: the mnemonic (str) or ID (int) of the command to send
args: a list of command arguments.
events: a single or a sequence of event specifiers (event_predicates, mnemonics, or IDs)
start: an optional index or predicate to specify the earliest item to search
timeout: the number of seconds to wait before terminating the search (int)
Returns:
The event or events found by the search
"""
start = self.event_history.size()
self.send_command(command, args)
if isinstance(events, list):
return self.await_event_sequence(events, start=start, timeout=timeout)
else:
return self.await_event(events, start=start, timeout=timeout)
######################################################################################
# Command Asserts
######################################################################################
def send_and_assert_telemetry(self, command, args=[], channels=[], timeout=5):
"""
Sends the specified command and asserts on the specified channel update or sequence of
updates. See await_telemetry and await_telemetry_sequence for full details.
Note: If awaiting a sequence avoid specifying timestamps.
Args:
command: the mnemonic (str) or ID (int) of the command to send
args: a list of command arguments.
channels: a single or a sequence of channel specs (event_predicates, mnemonics, or IDs)
start: an optional index or predicate to specify the earliest item to search
timeout: the number of seconds to wait before terminating the search (int)
Returns:
The channel update or updates found by the search
"""
start = self.telemetry_history.size()
self.send_command(command, args)
if isinstance(channels, list):
return self.assert_telemetry_sequence(channels, start=start, timeout=timeout)
else:
return self.assert_telemetry(channels, start=start, timeout=timeout)
def send_and_assert_event(self, command, args=[], events=[], timeout=5):
"""
Sends the specified command and asserts on the specified event message or sequence of
messages. See assert_event and assert event sequence for full details.
Args:
command: the mnemonic (str) or ID (int) of the command to send
args: a list of command arguments.
events: a single or a sequence of event specifiers (event_predicates, mnemonics, or IDs)
start: an optional index or predicate to specify the earliest item to search
timeout: the number of seconds to wait before terminating the search (int)
Returns:
The event or events found by the search
"""
start = self.event_history.size()
self.send_command(command, args)
if isinstance(events, list):
return self.assert_event_sequence(events, start=start, timeout=timeout)
else:
return self.assert_event(events, start=start, timeout=timeout)
######################################################################################
# Telemetry Functions
######################################################################################
def translate_telemetry_name(self, channel):
"""
This function will translate the given mnemonic into an ID as defined by the flight
software dictionary. This call will raise an error if the channel given is not in the
dictionary.
Args:
channel: a channel mnemonic (str) or id (int)
Returns:
the channel ID (int)
"""
if isinstance(channel, str):
ch_dict = self.pipeline.get_channel_name_dictionary()
if channel in ch_dict:
return ch_dict[channel].get_id()
else:
msg = "The telemetry mnemonic, {}, wasn't in the dictionary".format(channel)
raise KeyError(msg)
else:
ch_dict = self.pipeline.get_channel_id_dictionary()
if channel in ch_dict:
return channel
else:
msg = "The telemetry mnemonic, {}, wasn't in the dictionary".format(channel)
raise KeyError(msg)
def get_telemetry_pred(self, channel=None, value=None, time_pred=None):
"""
This function will translate the channel ID, and construct a telemetry_predicate object. It
is used as a helper by the IntegrationTestAPI, but could also be helpful to a user of the
test API. If channel is already an instance of telemetry_predicate, it will be returned
immediately. The provided implementation of telemetry_predicate evaluates true if and only
if all specified constraints are satisfied. If a specific constraint isn't specified, then
it will not effect the outcome; this means all arguments are optional. If no constraints
are specified, the predicate will always return true.
Args:
channel: an optional mnemonic (str), id (int), or predicate to specify the channel type
value: an optional value (object/number) or predicate to specify the value field
time_pred: an optional predicate to specify the flight software timestamp
Returns:
an instance of telemetry_predicate
"""
if isinstance(channel, predicates.telemetry_predicate):
return channel
if not predicates.is_predicate(channel) and channel is not None:
channel = self.translate_telemetry_name(channel)
channel = predicates.equal_to(channel)
if not predicates.is_predicate(value) and value is not None:
value = predicates.equal_to(value)
return predicates.telemetry_predicate(channel, value, time_pred)
def await_telemetry(
self, channel, value=None, time_pred=None, history=None, start="NOW", timeout=5
):
"""
A search for a single telemetry update received. By default, the call will only await
until a correct update is found. The user can specify that await also searches the current
history by specifying a value for start. On timeout, the search will return None.
Args:
channel: a channel specifier (mnemonic, id, or predicate)
value: optional value (object/number) or predicate to specify the value field
time_pred: an optional predicate to specify the flight software timestamp
history: if given, a substitute history that the function will search and await
start: an optional index or predicate to specify the earliest item to search
timeout: the number of seconds to wait before terminating the search (int)
Returns:
the ChData object found during the search, otherwise, None
"""
t_pred = self.get_telemetry_pred(channel, value, time_pred)
if history is None:
history = self.get_telemetry_test_history()
return self.find_history_item(t_pred, history, start, timeout)
def await_telemetry_sequence(self, channels, history=None, start="NOW", timeout=5):
"""
A search for a sequence of telemetry updates. By default, the call will only await until
the sequence is completed. The user can specify that await also searches the history by
specifying a value for start. On timeout, the search will return the list of found
channel updates regardless of whether the sequence is complete.
Note: It is reccomended (but not enforced) not to specify timestamps for this assert.
Note: This function will always return a list of updates. The user should check if the
sequence was completed.
Args:
channels: an ordered list of channel specifiers (mnemonic, id, or predicate)
history: if given, a substitute history that the function will search and await
start: an optional index or predicate to specify the earliest item to search
timeout: the number of seconds to wait before terminating the search (int)
Returns:
an ordered list of ChData objects that satisfies the sequence
"""
seq_preds = []
for channel in channels:
seq_preds.append(self.get_telemetry_pred(channel))
if history is None:
history = self.get_telemetry_test_history()
return self.find_history_sequence(seq_preds, history, start, timeout)
def await_telemetry_count(
self, count, channels=None, history=None, start="NOW", timeout=5
):
"""
A search on the number of telemetry updates received. By default, the call will only await
until a correct count is achieved. The user can specify that await also searches the current
history by specifying a value for start. On timeout, the search will return the list of
found channel updates regardless of whether a correct count is achieved.
Note: this search will always return a list of objects. The user should check if the search
was completed.
Args:
count: either an exact amount (int) or a predicate to specify how many objects to find
channels: a channel specifier or list of channel specifiers (mnemonic, ID, or predicate)
history: if given, a substitute history that the function will search and await
start: an optional index or predicate to specify the earliest item to search
timeout: the number of seconds to wait before terminating the search (int)
Returns:
a list of the ChData objects that were counted
"""
if channels is None:
search = None
elif isinstance(channels, list):
t_preds = []
for channel in channels:
t_preds.append(self.get_telemetry_pred(channel=channel))
search = predicates.satisfies_any(t_preds)
else:
search = self.get_telemetry_pred(channel=channels)
if history is None:
history = self.get_telemetry_test_history()
return self.find_history_count(count, history, search, start, timeout)
######################################################################################
# Telemetry Asserts
######################################################################################
def assert_telemetry(
self, channel, value=None, time_pred=None, history=None, start=None, timeout=0
):
"""
An assert on a single telemetry update received. If the history doesn't have the
correct update, the call will await until a correct update is received or the
timeout, at which point it will assert failure.
Args:
channel: a channel specifier (mnemonic, id, or predicate)
value: optional value (object/number) or predicate to specify the value field
time_pred: an optional predicate to specify the flight software timestamp
history: if given, a substitute history that the function will search and await
start: an optional index or predicate to specify the earliest item to search
timeout: the number of seconds to wait before terminating the search (int)
Returns:
the ChData object found during the search
"""
pred = self.get_telemetry_pred(channel, value, time_pred)
result = self.await_telemetry(
channel, value, time_pred, history, start, timeout
)
self.__assert_pred("Telemetry Received", pred, result)
return result
def assert_telemetry_sequence(self, channels, history=None, start=None, timeout=0):
"""
A search for a sing sequence of telemetry updates messages. If the history doesn't have the
complete sequence, the call will await until the sequence is completed or the timeout, at
which point it will return the list of found channel updates.
Note: It is reccomended (but not enforced) not to specify timestamps for this assert.
Note: This function will always return a list of updates the user should check if the
sequence was completed.
Args:
channels: an ordered list of channel specifiers (mnemonic, id, or predicate)
history: if given, a substitute history that the function will search and await
start: an optional index or predicate to specify the earliest item to search
timeout: the number of seconds to wait before terminating the search (int)
Returns:
an ordered list of ChData objects that satisfies the sequence
"""
results = self.await_telemetry_sequence(channels, history, start, timeout)
len_pred = predicates.equal_to(len(channels))
self.__assert_pred("Telemetry Sequence", len_pred, len(results))
return results
def assert_telemetry_count(
self, count, channels=None, history=None, start=None, timeout=0
):
"""
An assert on the number of channel updates received. If the history doesn't have the
correct update count, the call will await until a correct count is achieved or the
timeout, at which point it will assert failure.
Args:
count: either an exact amount (int) or a predicate to specify how many objects to find
channels: a channel specifier or list of channel specifiers (mnemonic, ID, or predicate)
history: if given, a substitute history that the function will search and await
start: an optional index or predicate to specify the earliest item to search
timeout: the number of seconds to wait before terminating the search (int)
Returns:
a list of the ChData objects that were counted
"""
results = self.await_telemetry_count(count, channels, history, start, timeout)
if predicates.is_predicate(count):
count_pred = count
elif isinstance(count, int):
count_pred = predicates.equal_to(count)
self.__assert_pred("Telemetry Count", count_pred, len(results))
return results
######################################################################################
# Event Functions
######################################################################################
def translate_event_name(self, event):
"""
This function will translate the given mnemonic into an ID as defined by the
flight software dictionary. This call will raise an error if the event given is
not in the dictionary.
Args:
event: an event mnemonic (str) or ID (int)
Returns:
the event ID (int)
"""
if isinstance(event, str):
event_dict = self.pipeline.get_event_name_dictionary()
if event in event_dict:
return event_dict[event].get_id()
else:
msg = "The event mnemonic, {}, wasn't in the dictionary".format(event)
raise KeyError(msg)
else:
event_dict = self.pipeline.get_event_id_dictionary()
if event in event_dict:
return event
else:
msg = "The event id, {}, wasn't in the dictionary".format(event)
raise KeyError(msg)
def get_event_pred(self, event=None, args=None, severity=None, time_pred=None):
"""
This function will translate the event ID, and construct an event_predicate object. It is
used as a helper by the IntegrationTestAPI, but could also be helpful to a user of the test
API. If event is already an instance of event_predicate, it will be returned immediately.
The provided implementation of event_predicate evaluates true if and only if all specified
constraints are satisfied. If a specific constraint isn't specified, then it will not
effect the outcome; this means all arguments are optional. If no constraints are specified,
the predicate will always return true.
Args:
event: mnemonic (str), id (int), or predicate to specify the event type
args: list of arguments (list of values, predicates, or None to ignore)
severity: an EventSeverity enum or a predicate to specify the event severity
time_pred: predicate to specify the flight software timestamp
Returns:
an instance of event_predicate
"""
if isinstance(event, predicates.event_predicate):
return event
if not predicates.is_predicate(event) and event is not None:
event = self.translate_event_name(event)
event = predicates.equal_to(event)
if not predicates.is_predicate(args) and args is not None:
args = predicates.args_predicate(args)
if not predicates.is_predicate(severity) and severity is not None:
if not isinstance(severity, EventSeverity):
msg = "Given severity was not a valid Severity Enum Value: {} ({})".format(severity, type(severity))
raise TypeError(msg)
severity = predicates.equal_to(severity)
return predicates.event_predicate(event, args, severity, time_pred)
def await_event(
self, event, args=None, severity=None, time_pred=None, history=None, start="NOW", timeout=5
):
"""
A search for a single event message received. By default, the call will only await until a
correct message is found. The user can specify that await also searches the current history
by specifying a value for start. On timeout, the search will return None.
Args:
event: an event specifier (mnemonic, id, or predicate)
args: a list of expected arguments (list of values, predicates, or None for don't care)
severity: an EventSeverity enum or a predicate to specify the event severity
time_pred: an optional predicate to specify the flight software timestamp
history: if given, a substitute history that the function will search and await
start: an optional index or predicate to specify the earliest item to search
timeout: the number of seconds to wait before terminating the search (int)
Returns:
the EventData object found during the search, otherwise, None
"""
e_pred = self.get_event_pred(event, args, severity, time_pred)
if history is None:
history = self.get_event_test_history()
return self.find_history_item(e_pred, history, start, timeout)
def await_event_sequence(self, events, history=None, start="NOW", timeout=5):
"""
A search for a sequence of event messages. By default, the call will only await until
the sequence is completed. The user can specify that await also searches the history by
specifying a value for start. On timeout, the search will return the list of found
event messages regardless of whether the sequence is complete.
Note: It is reccomended (but not enforced) not to specify timestamps for this assert.
Note: This function will always return a list of events the user should check if the
sequence was completed.
Args:
events: an ordered list of event specifiers (mnemonic, id, or predicate)
history: if given, a substitute history that the function will search and await
start: an optional index or predicate to specify the earliest item to search
timeout: the number of seconds to wait before terminating the search (int)
Returns:
an ordered list of EventData objects that satisfies the sequence
"""
seq_preds = []
for event in events:
seq_preds.append(self.get_event_pred(event))
if history is None:
history = self.get_event_test_history()
return self.find_history_sequence(seq_preds, history, start, timeout)
def await_event_count(
self, count, events=None, history=None, start="NOW", timeout=5
):
"""
A search on the number of events received. By default, the call will only await until a
correct count is achieved. The user can specify that this await also searches the current
history by specifying a value for start. On timeout, the search will return the list of
found event messages regardless of whether a correct count is achieved.
Note: this search will always return a list of objects. The user should check if the search
was completed.
Args:
count: either an exact amount (int) or a predicate to specify how many objects to find
events: an event specifier or list of event specifiers (mnemonic, ID, or predicate)
history: if given, a substitute history that the function will search and await
start: an optional index or predicate to specify the earliest item to search
timeout: the number of seconds to wait before terminating the search (int)
Returns:
a list of the EventData objects that were counted
"""
if events is None:
search = None
elif isinstance(events, list):
e_preds = []
for event in events:
e_preds.append(self.get_event_pred(event=event))
search = predicates.satisfies_any(e_preds)
else:
search = self.get_event_pred(event=events)
if history is None:
history = self.get_event_test_history()
return self.find_history_count(count, history, search, start, timeout)
######################################################################################
# Event Asserts
######################################################################################
def assert_event(
self, event, args=None, severity=None, time_pred=None, history=None, start=None, timeout=0
):
"""
An assert on a single event message received. If the history doesn't have the
correct message, the call will await until a correct message is received or the
timeout, at which point it will assert failure.
Args:
event: an event specifier (mnemonic, id, or predicate)
args: a list of expected arguments (list of values, predicates, or None for don't care)
severity: an EventSeverity enum or a predicate to specify the event severity
time_pred: an optional predicate to specify the flight software timestamp
history: if given, a substitute history that the function will search and await
start: an optional index or predicate to specify the earliest item to search
timeout: the number of seconds to wait before terminating the search (int)
Returns:
the EventData object found during the search
"""
pred = self.get_event_pred(event, args, severity, time_pred)
result = self.await_event(event, args, severity, time_pred, history, start, timeout)
self.__assert_pred("Event Received", pred, result)
return result
def assert_event_sequence(self, events, history=None, start=None, timeout=0):
"""
An assert that a sequence of event messages is received. If the history doesn't have the
complete sequence, the call will await until the sequence is completed or the timeout, at
which point it will assert failure.
Note: It is reccomended (but not enforced) not to specify timestamps for this assert.
Args:
events: an ordered list of event specifiers (mnemonic, id, or predicate)
history: if given, a substitute history that the function will search and await
start: an optional index or predicate to specify the earliest item to search
timeout: the number of seconds to wait before terminating the search (int)
Returns:
an ordered list of EventData objects that satisfied the sequence
"""
results = self.await_event_sequence(events, history, start, timeout)
len_pred = predicates.equal_to(len(events))
self.__assert_pred("Event Sequence length", len_pred, len(results))
return results
def assert_event_count(
self, count, events=None, history=None, start=None, timeout=0
):
"""
An assert on the number of events received. If the history doesn't have the
correct event count, the call will await until a correct count is achieved or the
timeout, at which point it will assert failure.
Args:
count: either an exact amount (int) or a predicate to specify how many objects to find
events: optional event specifier or list of specifiers (mnemonic, id, or predicate)
history: if given, a substitute history that the function will search and await
start: an optional index or predicate to specify the earliest item to search
timeout: the number of seconds to wait before terminating the search (int)
Returns:
a list of the EventData objects that were counted
"""
results = self.await_event_count(count, events, history, start, timeout)
if predicates.is_predicate(count):
count_pred = count
elif isinstance(count, int):
count_pred = predicates.equal_to(count)
self.__assert_pred("Event Count", count_pred, len(results))
return results
######################################################################################
# History Searches
######################################################################################
class TimeoutException(Exception):
"""
This exception is used by the history searches to signal the end of the timeout.
"""
pass
class __HistorySearcher():
"""
This class defines the calls made by the __history_search helper method and has a unique
implementation for each type of search provided by the api.
"""
def __init__(self):
self.ret_val = None
raise NotImplementedError()
def search_current_history(self, items):
"""
Searches the scoped existing history
Return:
True if the search was satisfied, False otherwise
"""
raise NotImplementedError()
def incremental_search(self, item):
"""
Searches one awaited item at a time
Return:
True if the search was satisfied, False otherwise
"""
raise NotImplementedError()
def get_return_value(self):
"""
Returns the result of the search whether the search is successful or not
"""
return self.ret_val
def __timeout_sig_handler(self, signum, frame):
raise self.TimeoutException()
def __search_test_history(self, searcher, name, history, start=None, timeout=0):
"""
This helper method contains the common logic to all search methods in the test API. This
means searches on both the event and channel histories rely on this helper. Each history
search is performed on both current history items and then on items that have yet to be
added to the history. The API defines these two scopes using the variables start and
timeout. They have several useful behaviors.
start is used to pick the earliest item to search in the current history. start can be
specified as either a predicate to search for the first item, an index of the history, the
API variable NOW, or an instance of the TimeType timestamp object. The behavior of NOW is
to ignore the current history and only search awaited items until the timestamp. The
behavior of giving a timetype is to only search items that happened at or after the
specified timestamp.
timeout is a specification of how long to await future items in seconds. Specifying a
timeout of 0 will ignore all future items. The timeout specifies an increment of time
relative to the local clock, not the embedded application's clock.
Note: the API does not try to check for edge cases where the final item in a search is
received as the search times out. The user should ensure that their timeouts are sufficient
to complete any awaiting searches.
Finally, the test API supports the ability to substitute a history object for any search.
This is part of why history must be specified for each search
Args:
searcher: an instance of __HistorySearcher to execute search-specific logic
name: a string name to differentiate the type of search
history: the TestHistory object to conduct the search on
start: an index, a predicate, the NOW variable, or a TimeType timestamp to pick the
first item to search
timeout: the number of seconds to await future items
"""
if start == self.NOW:
start = history.size()
elif isinstance(start, TimeType):
time_pred = predicates.greater_than_or_equal_to(start)
e_pred = self.get_telemetry_pred(time_pred=time_pred)
t_pred = self.get_event_pred(time_pred=time_pred)
start = predicates.satisfies_any([e_pred, t_pred])
current = history.retrieve(start)
if searcher.search_current_history(current):
return searcher.get_return_value()
if timeout:
self.__log(name + " now awaiting for at most {} s.".format(timeout))
try:
signal.signal(signal.SIGALRM, self.__timeout_sig_handler)
signal.alarm(timeout)
while True:
new_items = history.retrieve_new()
for item in new_items:
if searcher.incremental_search(item):
return searcher.get_return_value()
time.sleep(0.1)
except self.TimeoutException:
self.__log(name + " timed out and ended unsuccessfully.", TestLogger.YELLOW)
finally:
signal.alarm(0)
else:
self.__log(name + " ended unsuccessfully.", TestLogger.YELLOW)
return searcher.get_return_value()
def find_history_item(self, search_pred, history, start=None, timeout=0):
"""
This function can both search and await for an element in a history. The function will
return the first valid object it finds. The search will return when an object is found, or
the timeout is reached.
Args:
search_pred: a predicate to specify a history item.
history: the history that the function will search and await
start: an index or predicate to specify the earliest item from the history to search
timeout: the number of seconds to wait before terminating the search (int)
Returns:
the data object found during the search, otherwise, None
"""
class __ItemSearcher(self.__HistorySearcher):
def __init__(self, log, search_pred):
self.log = log
self.search_pred = search_pred
self.ret_val = None
msg = "Beginning an item search for an item that satisfies:\n {}".format(self.search_pred)
self.log(msg, TestLogger.YELLOW)
def search_current_history(self, items):
for item in items:
if self.incremental_search(item):
return True
return False
def incremental_search(self, item):
if self.search_pred(item):
msg = "History search found the specified item: {}".format(item)
self.log(msg, TestLogger.YELLOW)
self.ret_val = item
return True
return False
searcher = __ItemSearcher(self.__log, search_pred)
return self.__search_test_history(searcher, "Item search", history, start, timeout)
def find_history_sequence(self, seq_preds, history, start=None, timeout=0):
"""
This function can both search and await for a sequence of elements in a history. The
function will return a list of the history objects to satisfy the sequence search. The
search will return when an order of data objects is found that satisfies the entire
sequence, or the timeout occurs.
Note: this search will always return a list of objects. The user should check if the search
was completed.
Args:
seq_preds: an ordered list of predicate objects to specify a sequence
history: the history that the function will search and await
start: an index or predicate to specify the earliest item from the history to search
timeout: the number of seconds to wait before terminating the search (int)
Returns:
a list of data objects that satisfied the sequence
"""
class __SequenceSearcher(self.__HistorySearcher):
def __init__(self, log, seq_preds):
self.log = log
self.ret_val = []
self.seq_preds = seq_preds.copy()
msg = "Beginning a sequence search of {} items.".format(len(self.seq_preds))
self.log(msg, TestLogger.YELLOW)
def search_current_history(self, items):
if len(self.seq_preds) == 0:
msg = "Sequence search finished, as the specified sequence had 0 items."
self.log(msg, TestLogger.YELLOW)
return True
for item in items: