/
instrument_agent.py
1297 lines (1084 loc) · 57.9 KB
/
instrument_agent.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python
"""
@package ion.agents.instrument.instrument_agent Instrument resource agent
@file ion/agents.instrument/instrument_agent.py
@author Edward Hunter
@brief Resource agent derived class providing an instrument agent as a resource.
This resource fronts instruments and instrument drivers one-to-one in ION.
"""
__author__ = 'Edward Hunter'
__license__ = 'Apache 2.0'
# Pyon imports
from pyon.public import IonObject, log, RT, PRED, LCS, OT, CFG
from pyon.agent.agent import ResourceAgent
from pyon.agent.agent import ResourceAgentEvent
from pyon.agent.agent import ResourceAgentState
from pyon.agent.agent import ResourceAgentStreamStatus
from pyon.util.containers import get_ion_ts
from pyon.core.governance import ORG_MANAGER_ROLE, GovernanceHeaderValues, has_org_role, get_resource_commitments
from ion.services.sa.observatory.observatory_management_service import INSTRUMENT_OPERATOR_ROLE, OBSERVATORY_OPERATOR_ROLE
from pyon.public import IonObject
# Pyon exceptions.
from pyon.core.exception import IonException, Inconsistent
from pyon.core.exception import BadRequest
from pyon.core.exception import Conflict
from pyon.core.exception import Timeout
from pyon.core.exception import NotFound
from pyon.core.exception import ServerError
from pyon.core.exception import ResourceError
# Standard imports.
import socket
import json
import copy
# Packages
import gevent
# ION imports.
from ion.agents.instrument.driver_process import DriverProcess
from ion.agents.instrument.common import BaseEnum
from ion.agents.instrument.instrument_fsm import FSMStateError
from ion.agents.instrument.instrument_fsm import FSMCommandUnknownError
from ion.agents.instrument.direct_access.direct_access_server import DirectAccessTypes
from ion.agents.instrument.direct_access.direct_access_server import DirectAccessServer
from ion.agents.instrument.direct_access.direct_access_server import SessionCloseReasons
from ion.agents.agent_stream_publisher import AgentStreamPublisher
from ion.agents.agent_alert_manager import AgentAlertManager
# MI imports
from ion.core.includes.mi import DriverAsyncEvent
from interface.objects import StreamAlertType
from interface.objects import AgentCommand, StatusType, DeviceStatusType, AggregateStatusType
class InstrumentAgentState(BaseEnum):
POWERED_DOWN = ResourceAgentState.POWERED_DOWN
UNINITIALIZED = ResourceAgentState.UNINITIALIZED
INACTIVE = ResourceAgentState.INACTIVE
IDLE = ResourceAgentState.IDLE
STOPPED = ResourceAgentState.STOPPED
COMMAND = ResourceAgentState.COMMAND
STREAMING = ResourceAgentState.STREAMING
TEST = ResourceAgentState.TEST
CALIBRATE = ResourceAgentState.CALIBRATE
BUSY = ResourceAgentState.BUSY
LOST_CONNECTION = ResourceAgentState.LOST_CONNECTION
ACTIVE_UNKNOWN = ResourceAgentState.ACTIVE_UNKNOWN
class InstrumentAgentEvent(BaseEnum):
ENTER = ResourceAgentEvent.ENTER
EXIT = ResourceAgentEvent.EXIT
POWER_UP = ResourceAgentEvent.POWER_UP
POWER_DOWN = ResourceAgentEvent.POWER_DOWN
INITIALIZE = ResourceAgentEvent.INITIALIZE
GO_ACTIVE = ResourceAgentEvent.GO_ACTIVE
GO_INACTIVE = ResourceAgentEvent.GO_INACTIVE
RUN = ResourceAgentEvent.RUN
CLEAR = ResourceAgentEvent.CLEAR
PAUSE = ResourceAgentEvent.PAUSE
RESUME = ResourceAgentEvent.RESUME
GO_COMMAND = ResourceAgentEvent.GO_COMMAND
GO_DIRECT_ACCESS = ResourceAgentEvent.GO_DIRECT_ACCESS
GET_RESOURCE = ResourceAgentEvent.GET_RESOURCE
SET_RESOURCE = ResourceAgentEvent.SET_RESOURCE
EXECUTE_RESOURCE = ResourceAgentEvent.EXECUTE_RESOURCE
GET_RESOURCE_STATE = ResourceAgentEvent.GET_RESOURCE_STATE
GET_RESOURCE_CAPABILITIES = ResourceAgentEvent.GET_RESOURCE_CAPABILITIES
DONE = ResourceAgentEvent.DONE
PING_RESOURCE = ResourceAgentEvent.PING_RESOURCE
LOST_CONNECTION = ResourceAgentEvent.LOST_CONNECTION
AUTORECONNECT = ResourceAgentEvent.AUTORECONNECT
class InstrumentAgentCapability(BaseEnum):
INITIALIZE = ResourceAgentEvent.INITIALIZE
RESET = ResourceAgentEvent.RESET
GO_ACTIVE = ResourceAgentEvent.GO_ACTIVE
GO_INACTIVE = ResourceAgentEvent.GO_INACTIVE
RUN = ResourceAgentEvent.RUN
CLEAR = ResourceAgentEvent.CLEAR
PAUSE = ResourceAgentEvent.PAUSE
RESUME = ResourceAgentEvent.RESUME
GO_COMMAND = ResourceAgentEvent.GO_COMMAND
GO_DIRECT_ACCESS = ResourceAgentEvent.GO_DIRECT_ACCESS
class ResourceInterfaceCapability(BaseEnum):
GET_RESOURCE = ResourceAgentEvent.GET_RESOURCE
SET_RESOURCE = ResourceAgentEvent.SET_RESOURCE
PING_RESOURCE = ResourceAgentEvent.PING_RESOURCE
GET_RESOURCE_STATE = ResourceAgentEvent.GET_RESOURCE_STATE
EXECUTE_RESOURCE = ResourceAgentEvent.EXECUTE_RESOURCE
class InstrumentAgent(ResourceAgent):
"""
ResourceAgent derived class for the instrument agent. This class
logically abstracts instruments as taskable resources in the ION
system. It directly provides common functionality (common state model,
common resource interface, point of publication) and creates
a driver process to specialize for particular hardware.
"""
# Override to publish specific types of events
COMMAND_EVENT_TYPE = "DeviceCommandEvent"
# Override to set specific origin type
ORIGIN_TYPE = "InstrumentDevice"
def __init__(self, *args, **kwargs):
"""
"""
super(InstrumentAgent, self).__init__(self, *args, **kwargs)
###############################################################################
# Instrument agent internal parameters.
###############################################################################
#This is the type of Resource managed by this agent
self.resource_type = RT.InstrumentDevice
# Driver configuration. Passed as part of the spawn configuration
# or with an initialize command. Sets driver specific
# context.
self._dvr_config = None
# The driver process popen object. To terminate, signal, wait on,
# or otherwise interact with the driver process via subprocess.
# Set by transition to inactive.
self._dvr_proc = None
# The driver client for communicating to the driver process in
# request-response or event publication. Set by transition to
# inactive.
self._dvr_client = None
# Flag indicates if the agent is running in a test so that it
# can instruct drivers to self destruct if it disappears.
self._test_mode = False
# Set the 'reason' to be the default.
self._da_session_close_reason = 'due to ION request'
# The direct accerss server
self._da_server = None
# List of current alarm objects.
self.aparam_alerts = []
# The get/set helpers are set by the manager class.
self.aparam_get_alerts = None
self.aparam_set_alerts = None
#list of the aggreate status states for this device
self.aparam_aggstatus = {}
# The set helpers are set by the manager class.
# Set is read only. Use base class get function.
self.aparam_set_aggstatus = None
# Dictionary of stream fields.
self.aparam_streams = {}
# The set helper are set by the manager class (read only).
# We use the default base class get function.
self.aparam_set_streams = None
# Dictionary of stream publication rates.
self.aparam_pubrate = {}
# The set helper is set by the manager class.
# Use default base class get function.
self.aparam_set_pubrate = None
# Autoreconnect thread.
self._autoreconnect_greenlet = None
# State when lost.
self._state_when_lost = None
# Agent stream publisher.
self._asp = None
# Agent alert manager.
self._aam = None
# Default initial state.
self._initial_state = ResourceAgentState.UNINITIALIZED
def on_init(self):
"""
Instrument agent pyon process initialization.
Init objects that depend on the container services and start state
machine.
"""
# Set the driver config from the agent config if present.
self._dvr_config = self.CFG.get('driver_config', None)
# Set the test mode.
self._test_mode = self.CFG.get('test_mode', False)
# Set up streams.
self._asp = AgentStreamPublisher(self)
# Set up alert manager.
self._aam = AgentAlertManager(self)
# Superclass on_init attemtps state restore.
super(InstrumentAgent, self).on_init()
def on_quit(self):
"""
"""
super(InstrumentAgent, self).on_quit()
self._aam.stop_all()
state = self._fsm.get_current_state()
if state == ResourceAgentState.UNINITIALIZED:
pass
elif state == ResourceAgentState.INACTIVE:
result = self._stop_driver()
else:
self._dvr_client.cmd_dvr('disconnect')
self._dvr_client.cmd_dvr('initialize')
result = self._stop_driver()
##############################################################
# Capabilities interface and event handlers.
##############################################################
def _handler_get_resource_capabilities(self, *args, **kwargs):
"""
"""
result = None
next_state = None
result = self._dvr_client.cmd_dvr('get_resource_capabilities', *args, **kwargs)
return (next_state, result)
def _filter_capabilities(self, events):
events_out = [x for x in events if InstrumentAgentCapability.has(x)]
return events_out
def _get_resource_interface(self, current_state=True):
"""
"""
agent_cmds = self._fsm.get_events(current_state)
res_iface_cmds = [x for x in agent_cmds if ResourceInterfaceCapability.has(x)]
# convert agent mediated resource commands into interface names.
result = []
for x in res_iface_cmds:
if x == ResourceAgentEvent.GET_RESOURCE:
result.append('get_resource')
elif x == ResourceAgentEvent.SET_RESOURCE:
result.append('set_resource')
elif x == ResourceAgentEvent.PING_RESOURCE:
result.append('ping_resource')
elif x == ResourceAgentEvent.GET_RESOURCE_STATE:
result.append('get_resource_state')
elif x == ResourceAgentEvent.EXECUTE_RESOURCE:
result.append('execute_resource')
return result
##############################################################
# Agent interface.
##############################################################
##############################################################
# Governance interfaces
##############################################################
#TODO - When/If the Instrument and Platform agents are dervied from a
# common device agent class, then relocate to the parent class and share
def check_resource_operation_policy(self, msg, headers):
'''
This function is used for governance validation for certain agent operations.
@param msg:
@param headers:
@return:
'''
try:
gov_values = GovernanceHeaderValues(headers, resource_id_required=False)
except Inconsistent, ex:
return False, ex.message
if has_org_role(gov_values.actor_roles ,self._get_process_org_governance_name(),
[ORG_MANAGER_ROLE, OBSERVATORY_OPERATOR_ROLE]):
return True, ''
if not has_org_role(gov_values.actor_roles ,self._get_process_org_governance_name(),
INSTRUMENT_OPERATOR_ROLE):
return False, '%s(%s) has been denied since the user %s does not have the %s role for Org %s'\
% (self.name, gov_values.op, gov_values.actor_id, INSTRUMENT_OPERATOR_ROLE,
self._get_process_org_governance_name())
com = get_resource_commitments(gov_values.actor_id,
gov_values.resource_id)
if com is None:
return False, '%s(%s) has been denied since the user %s has not acquired the resource %s' \
% (self.name, gov_values.op, gov_values.actor_id,
self.resource_id)
return True, ''
##############################################################
# Resource interface and common resource event handlers.
##############################################################
def _handler_get_resource(self, *args, **kwargs):
try:
params = args[0]
# Raise ION BadRequest if required parameters missing.
except KeyError:
raise BadRequest('get_resource missing parameters argument.')
result = self._dvr_client.cmd_dvr('get_resource', params)
return (None, result)
def _handler_set_resource(self, *args, **kwargs):
try:
params = args[0]
except KeyError:
raise BadRequest('set_resource missing parameters argument.')
result = self._dvr_client.cmd_dvr('set_resource', params)
return (None, result)
def _handler_execute_resource(self, *args, **kwargs):
return self._dvr_client.cmd_dvr('execute_resource', *args, **kwargs)
def _handler_get_resource_state(self, *args, **kwargs):
result = self._dvr_client.cmd_dvr('get_resource_state',*args, **kwargs)
return (None, result)
def _handler_ping_resource(self, *args, **kwargs):
result = '%s, time:%s' % (self._dvr_client.cmd_dvr('process_echo', *args, **kwargs),
get_ion_ts())
return (None, result)
def _handler_done(self, *args, **kwargs):
return (ResourceAgentState.COMMAND, None)
##############################################################
# UNINITIALIZED event handlers.
##############################################################
def _handler_uninitialized_initialize(self, *args, **kwargs):
# If a config is passed, update member.
try:
self._dvr_config = args[0]
except IndexError:
pass
# If config not valid, fail.
if not self._validate_driver_config():
raise BadRequest('The driver configuration is missing or invalid.')
# Start the driver and switch to inactive.
self._start_driver(self._dvr_config)
return (ResourceAgentState.INACTIVE, None)
##############################################################
# INACTIVE event handlers.
##############################################################
def _handler_inactive_reset(self, *args, **kwargs):
result = self._stop_driver()
return (ResourceAgentState.UNINITIALIZED, result)
def _handler_inactive_go_active(self, *args, **kwargs):
# Set the driver config if passed as a parameter.
try:
self._dvr_config['comms_config'] = args[0]
except IndexError:
pass
# Connect to the device.
dvr_comms = self._dvr_config.get('comms_config', None)
self._dvr_client.cmd_dvr('configure', dvr_comms)
self._dvr_client.cmd_dvr('connect')
# Reset the connection id and index.
self._asp.reset_connection()
max_tries = kwargs.get('max_tries', 5)
if not isinstance(max_tries, int) or max_tries < 1:
max_tries = 5
no_tries = 0
while True:
try:
next_state = self._dvr_client.cmd_dvr('discover_state')
break
except Timeout, ResourceError:
no_tries += 1
if no_tries >= max_tries:
log.error("Could not discover instrument state")
next_state = ResourceAgentState.ACTIVE_UNKNOWN
#self._dvr_client.cmd_dvr('disconnect')
#raise
return (next_state, None)
##############################################################
# IDLE event handlers.
##############################################################
def _handler_idle_reset(self, *args, **kwargs):
# Disconnect, initialize, stop driver and go to uninitialized.
self._dvr_client.cmd_dvr('disconnect')
self._dvr_client.cmd_dvr('initialize')
result = self._stop_driver()
next_state = ResourceAgentState.UNINITIALIZED
return (next_state, result)
def _handler_idle_go_inactive(self, *args, **kwargs):
self._dvr_client.cmd_dvr('disconnect')
self._dvr_client.cmd_dvr('initialize')
return (ResourceAgentState.INACTIVE, None)
def _handler_idle_run(self, *args, **kwargs):
# TODO: need to determine correct obs state to enter (streaming or
# command, and follow agent transitions as needed.)
return (ResourceAgentState.COMMAND, None)
##############################################################
# STOPPED event handlers.
##############################################################
def _handler_stopped_reset(self, *args, **kwargs):
self._dvr_client.cmd_dvr('disconnect')
self._dvr_client.cmd_dvr('initialize')
result = self._stop_driver()
return (ResourceAgentState.UNINITIALIZED, result)
def _handler_stopped_go_inactive(self, *args, **kwargs):
self._dvr_client.cmd_dvr('disconnect')
self._dvr_client.cmd_dvr('initialize')
return (ResourceAgentState.INACTIVE, None)
def _handler_stopped_resume(self, *args, **kwargs):
return (ResourceAgentState.COMMAND, None)
def _handler_stopped_clear(self, *args, **kwargs):
return (ResourceAgentState.IDLE, None)
##############################################################
# COMMAND event handlers.
##############################################################
def _handler_command_reset(self, *args, **kwargs):
self._dvr_client.cmd_dvr('disconnect')
self._dvr_client.cmd_dvr('initialize')
result = self._stop_driver()
return (ResourceAgentState.UNINITIALIZED, result)
def _handler_command_go_inactive(self, *args, **kwargs):
self._dvr_client.cmd_dvr('disconnect')
self._dvr_client.cmd_dvr('initialize')
return (ResourceAgentState.INACTIVE, None)
def _handler_command_clear(self, *args, **kwargs):
return (ResourceAgentState.IDLE, None)
def _handler_command_pause(self, *args, **kwargs):
return (ResourceAgentState.STOPPED, None)
def _handler_command_go_direct_access(self, *args, **kwargs):
session_timeout = kwargs.get('session_timeout', 10)
inactivity_timeout = kwargs.get('inactivity_timeout', 5)
session_type = kwargs.get('session_type', None)
if not session_type:
raise BadRequest('Instrument parameter error attempting direct access: session_type not present')
log.info("Instrument agent requested to start direct access mode: sessionTO=%d, inactivityTO=%d, session_type=%s",
session_timeout, inactivity_timeout,
dir(DirectAccessTypes)[session_type])
# get 'address' of host
hostname = socket.gethostname()
ip_addresses = socket.gethostbyname_ex(hostname)
log.debug("hostname: %s, ip address: %s", hostname, ip_addresses)
# ip_address = ip_addresses[2][0]
ip_address = hostname
# create a DA server instance (TODO: just telnet for now) and pass
# in callback method
try:
self._da_server = DirectAccessServer(session_type,
self._da_server_input_processor,
ip_address,
session_timeout,
inactivity_timeout)
except Exception as ex:
log.warning("InstrumentAgent: failed to start DA Server <%s>",ex)
raise ex
# get the connection info from the DA server to return to the user
port, token = self._da_server.get_connection_info()
result = {'ip_address':ip_address, 'port':port, 'token':token}
# tell driver to start direct access mode
next_state, _ = self._dvr_client.cmd_dvr('start_direct')
return (next_state, result)
##############################################################
# STREAMING event handlers.
##############################################################
def _handler_streaming_reset(self, *args, **kwargs):
self._dvr_client.cmd_dvr('disconnect')
self._dvr_client.cmd_dvr('initialize')
result = self._stop_driver()
return (ResourceAgentState.UNINITIALIZED, result)
def _handler_streaming_go_inactive(self, *args, **kwargs):
self._dvr_client.cmd_dvr('disconnect')
self._dvr_client.cmd_dvr('initialize')
return (ResourceAgentState.INACTIVE, None)
##############################################################
# TEST event handlers.
##############################################################
##############################################################
# CALIBRATE event handlers.
##############################################################
##############################################################
# BUSY event handlers.
##############################################################
##############################################################
# DIRECT_ACCESS event handlers.
##############################################################
def _handler_direct_access_go_command(self, *args, **kwargs):
log.info("Instrument agent requested to stop direct access mode - %s",
self._da_session_close_reason)
# tell driver to stop direct access mode
next_state, _ = self._dvr_client.cmd_dvr('stop_direct')
# stop DA server
if (self._da_server):
self._da_server.stop()
self._da_server = None
# re-set the 'reason' to be the default
self._da_session_close_reason = 'due to ION request'
return (next_state, None)
##############################################################
# CONNECTION_LOST event handler, available in any active state.
##############################################################
def _handler_connection_lost_driver_event(self, *args, **kwargs):
"""
Handle a connection lost event from the driver.
"""
self._state_when_lost = self._fsm.get_current_state()
return (ResourceAgentState.LOST_CONNECTION, None)
##############################################################
# CONNECTION_LOST event handlers.
##############################################################
def _handler_lost_connection_enter(self, *args, **kwargs):
super(InstrumentAgent, self)._common_state_enter(*args, **kwargs)
log.error('Instrument agent %s lost connection to the device.',
self._proc_name)
self._event_publisher.publish_event(
event_type='ResourceAgentConnectionLostErrorEvent',
origin_type=self.ORIGIN_TYPE,
origin=self.resource_id)
# Setup reconnect timer.
self._autoreconnect_greenlet = gevent.spawn(self._autoreconnect)
def _handler_lost_connection_exit(self, *args, **kwargs):
super(InstrumentAgent, self)._common_state_exit(*args, **kwargs)
if self._autoreconnect_greenlet:
self._autoreconnect_greenlet = None
def _autoreconnect(self):
while self._autoreconnect_greenlet:
gevent.sleep(10)
try:
self._fsm.on_event(ResourceAgentEvent.AUTORECONNECT)
except:
pass
def _handler_lost_connection__reset(self, *args, **kwargs):
self._dvr_client.cmd_dvr('initialize')
result = self._stop_driver()
return (ResourceAgentState.UNINITIALIZED, result)
def _handler_lost_connection__go_inactive(self, *args, **kwargs):
self._dvr_client.cmd_dvr('initialize')
return (ResourceAgentState.INACTIVE, None)
def _handler_lost_connection__autoreconnect(self, *args, **kwargs):
try:
self._dvr_client.cmd_dvr('connect')
# Reset the connection id and index.
self._asp.reset_connection()
except:
return (None, None)
max_tries = kwargs.get('max_tries', 5)
if not isinstance(max_tries, int) or max_tries < 1:
max_tries = 5
no_tries = 0
while True:
try:
next_state = self._dvr_client.cmd_dvr('discover_state')
break
except Timeout, ResourceError:
no_tries += 1
if no_tries >= max_tries:
log.error("Could not discover instrument state")
next_state = ResourceAgentState.ACTIVE_UNKNOWN
if next_state == ResourceAgentState.IDLE and \
self._state_when_lost == ResourceAgentState.COMMAND:
next_state = ResourceAgentState.COMMAND
return (next_state, None)
##############################################################
# ACTIVE_UNKNOWN event handlers.
##############################################################
def _handler_active_unknown_go_active(self, *args, **kwargs):
max_tries = kwargs.get('max_tries', 5)
if not isinstance(max_tries, int) or max_tries < 1:
max_tries = 5
no_tries = 0
while True:
try:
next_state = self._dvr_client.cmd_dvr('discover_state')
break
except Timeout, ResourceError:
no_tries += 1
if no_tries >= max_tries:
log.error("Could not discover instrument state")
next_state = None
def _handler_active_unknown_go_inactive(self, *args, **kwargs):
self._dvr_client.cmd_dvr('initialize')
return (ResourceAgentState.INACTIVE, None)
def _handler_active_unknown_reset(self, *args, **kwargs):
self._dvr_client.cmd_dvr('initialize')
result = self._stop_driver()
return (ResourceAgentState.UNINITIALIZED, result)
def _handler_active_unknown_go_direct_access(self, *args, **kwargs):
session_timeout = kwargs.get('session_timeout', 10)
inactivity_timeout = kwargs.get('inactivity_timeout', 5)
session_type = kwargs.get('session_type', None)
if not session_type:
raise BadRequest('Instrument parameter error attempting direct access: session_type not present')
log.info("Instrument agent requested to start direct access mode: sessionTO=%d, inactivityTO=%d, session_type=%s",
session_timeout, inactivity_timeout, dir(DirectAccessTypes)[session_type])
# get 'address' of host
hostname = socket.gethostname()
ip_addresses = socket.gethostbyname_ex(hostname)
log.debug("hostname: %s, ip address: %s", hostname, ip_addresses)
# ip_address = ip_addresses[2][0]
ip_address = hostname
# create a DA server instance (TODO: just telnet for now) and pass in callback method
try:
self._da_server = DirectAccessServer(session_type,
self._da_server_input_processor,
ip_address,
session_timeout,
inactivity_timeout)
except Exception as ex:
log.warning("InstrumentAgent: failed to start DA Server <%s>",ex)
raise ex
# get the connection info from the DA server to return to the user
port, token = self._da_server.get_connection_info()
result = {'ip_address':ip_address, 'port':port, 'token':token}
# tell driver to start direct access mode
next_state, _ = self._dvr_client.cmd_dvr('start_direct')
return (next_state, result)
##############################################################
# Asynchronous driver event callback and handlers.
##############################################################
def evt_recv(self, evt):
"""
Route async event received from driver.
"""
log.info('Instrument agent %s got async driver event %s', self.id, evt)
try:
type = evt['type']
val = evt['value']
ts = evt['time']
except KeyError, ValueError:
log.error('Instrument agent %s received driver event %s has missing required fields.',
self.id, evt)
return
if type == DriverAsyncEvent.STATE_CHANGE:
self._async_driver_event_state_change(val, ts)
elif type == DriverAsyncEvent.CONFIG_CHANGE:
self._async_driver_event_config_change(val, ts)
elif type == DriverAsyncEvent.SAMPLE:
self._async_driver_event_sample(val, ts)
elif type == DriverAsyncEvent.ERROR:
self._async_driver_event_error(val, ts)
elif type == DriverAsyncEvent.RESULT:
self._async_driver_event_result(val, ts)
elif type == DriverAsyncEvent.DIRECT_ACCESS:
self._async_driver_event_direct_access(val, ts)
elif type == DriverAsyncEvent.AGENT_EVENT:
self._async_driver_event_agent_event(val, ts)
else:
log.error('Instrument agent %s received unknown driver event %s.',
self._proc_name, str(evt))
def _async_driver_event_state_change(self, val, ts):
"""
Publish driver state change event.
"""
try:
log.info('Instrument agent %s driver state change: %s',
self._proc_name, val)
event_data = { 'state' : val }
self._event_publisher.publish_event(
event_type='ResourceAgentResourceStateEvent',
origin_type=self.ORIGIN_TYPE,
origin=self.resource_id, **event_data)
except:
log.error('Instrument agent %s could not publish driver state change event.',
self._proc_name)
def _async_driver_event_config_change(self, val, ts):
"""
Publsih resource config change event and update persisted info.
"""
if self._enable_persistence:
self._set_state('rparams', val)
try:
event_data = {
'config' : val
}
self._event_publisher.publish_event(
event_type='ResourceAgentResourceConfigEvent',
origin_type=self.ORIGIN_TYPE,
origin=self.resource_id, **event_data)
except:
log.error('Instrument agent %s could not publish driver config change event.',
self._proc_name)
def _async_driver_event_sample(self, val, ts):
"""
Publish sample on sample data streams.
"""
"""
quality_flag : ok
preferred_timestamp : driver_timestamp
stream_name : raw
pkt_format_id : JSON_Data
pkt_version : 1
values : [{u'binary': True, u'value_id': u'raw', u'value':
u'MTkuMDYxMiwzLjMzNzkxLCA0NDkuMDA1LCAgIDE0Ljg3MjksIDE1MDUuMTQ3L
CAwMSBGZWIgMjAwMSwgMDE6MDE6MDA='}]
quality_flag : ok
preferred_timestamp : driver_timestamp
stream_name : parsed
pkt_format_id : JSON_Data
pkt_version : 1
values : [{u'value_id': u'temp', u'value': 19.0612},
{u'value_id': u'conductivity', u'value': 3.33791},
{u'value_id': u'pressure', u'value': 449.005}]
u'quality_flag': u'ok',
u'preferred_timestamp': u'port_timestamp',
u'stream_name': u'raw',
u'port_timestamp': 3575139438.357514,
u'pkt_format_id': u'JSON_Data',
u'pkt_version': 1,
u'values': [
{u'binary': True, u'value_id': u'raw', u'value': u'aABlAGEAcgB0AGIAZQBhAHQAXwBpAG4AdABlAHIAdgBhAGwAIAAwAA=='},
{u'value_id': u'length', u'value': 40},
{u'value_id': u'type', u'value': 1},
{u'value_id': u'checksum', u'value': None}
],
u'driver_timestamp': 3575139438.206242
"""
# If the sample event is encoded, load it back to a dict.
if isinstance(val, str):
val = json.loads(val)
self._asp.on_sample(val)
try:
stream_name = val['stream_name']
values = val['values']
for v in values:
value = v['value']
value_id = v['value_id']
self._aam.process_alerts(stream_name=stream_name,
value=value, value_id=value_id)
except Exception as ex:
log.error('Insturment agent %s could not process alerts for driver tomato %s',
self._proc_name, str(val))
def _async_driver_event_error(self, val, ts):
"""
Publish async driver error.
"""
try:
# Publish resource error event.
if isinstance(val, IonException):
event_data = {
'error_type' : str(type(val)),
'error_msg' : val.message,
"error_code" : val.error_code
}
elif isinstance(val, Exception):
event_data = {
'error_type' : str(type(val)),
'error_msg' : val.message
}
else:
event_data = {
'error_msg' : str(val)
}
self._event_publisher.publish_event(
event_type='ResourceAgentErrorEvent',
origin_type=self.ORIGIN_TYPE,
origin=self.resource_id, **event_data)
except:
log.error('Instrument agent %s could not publish driver error event.',
self._proc_name)
def _async_driver_event_result(self, val, ts):
"""
Publish async driver result.
"""
try:
# Publsih async result event.
cmd = val.get('cmd', None)
desc = val.get('desc', None)
event_data = {
'command' : cmd,
'desc' : desc,
'result' : val
}
self._event_publisher.publish_event(
event_type='ResourceAgentAsyncResultEvent',
origin_type=self.ORIGIN_TYPE,
origin=self.resource_id, **event_data)
except:
log.error('Instrument agent %s could not publish driver result event.',
self._proc_name)
def _async_driver_event_direct_access(self, val, ts):
"""
Send async DA event.
"""
try:
evt = "Unknown"
if (self._da_server):
if (val):
self._da_server.send(val)
else:
log.error('Instrument agent %s error %s processing driver event %s',
self._proc_name, '<no value present in event>', str(evt))
else:
log.error('Instrument agent %s error %s processing driver event %s',
self._proc_name, '<no DA server>', str(evt))
except:
log.error('Instrument agent %s could not publish driver result event.',
self._proc_name)
def _async_driver_event_agent_event(self, val, ts):
"""
Driver initiated agent FSM event.
"""
try:
self._fsm.on_event(val)
except:
log.warning('Instrument agent %s error processing asynchronous agent event %s', self.id, val)
##############################################################
# FSM setup.
##############################################################
def _construct_fsm(self):
"""
Setup instrument agent FSM.
"""
# Here we could define subsets of states and events to specialize fsm.
# Construct default state machine states and handlers.
super(InstrumentAgent, self)._construct_fsm()
# UNINITIALIZED state event handlers.
self._fsm.add_handler(ResourceAgentState.UNINITIALIZED, ResourceAgentEvent.INITIALIZE, self._handler_uninitialized_initialize)
# Instrument agents do not currently use POWERED_DOWN.
# INACTIVE state event handlers.
self._fsm.add_handler(ResourceAgentState.INACTIVE, ResourceAgentEvent.RESET, self._handler_inactive_reset)
self._fsm.add_handler(ResourceAgentState.INACTIVE, ResourceAgentEvent.GO_ACTIVE, self._handler_inactive_go_active)
self._fsm.add_handler(ResourceAgentState.INACTIVE, ResourceAgentEvent.GET_RESOURCE_STATE, self._handler_get_resource_state)
self._fsm.add_handler(ResourceAgentState.INACTIVE, ResourceAgentEvent.PING_RESOURCE, self._handler_ping_resource)
# IDLE state event handlers.
self._fsm.add_handler(ResourceAgentState.IDLE, ResourceAgentEvent.RESET, self._handler_idle_reset)
self._fsm.add_handler(ResourceAgentState.IDLE, ResourceAgentEvent.GO_INACTIVE, self._handler_idle_go_inactive)
self._fsm.add_handler(ResourceAgentState.IDLE, ResourceAgentEvent.RUN, self._handler_idle_run)
self._fsm.add_handler(ResourceAgentState.IDLE, ResourceAgentEvent.GET_RESOURCE_STATE, self._handler_get_resource_state)
self._fsm.add_handler(ResourceAgentState.IDLE, ResourceAgentEvent.PING_RESOURCE, self._handler_ping_resource)
self._fsm.add_handler(ResourceAgentState.IDLE, ResourceAgentEvent.LOST_CONNECTION, self._handler_connection_lost_driver_event)
# STOPPED state event handlers.
self._fsm.add_handler(ResourceAgentState.STOPPED, ResourceAgentEvent.RESET, self._handler_stopped_reset)
self._fsm.add_handler(ResourceAgentState.STOPPED, ResourceAgentEvent.GO_INACTIVE, self._handler_stopped_go_inactive)
self._fsm.add_handler(ResourceAgentState.STOPPED, ResourceAgentEvent.RESUME, self._handler_stopped_resume)
self._fsm.add_handler(ResourceAgentState.STOPPED, ResourceAgentEvent.CLEAR, self._handler_stopped_clear)
self._fsm.add_handler(ResourceAgentState.STOPPED, ResourceAgentEvent.GET_RESOURCE_STATE, self._handler_get_resource_state)
self._fsm.add_handler(ResourceAgentState.STOPPED, ResourceAgentEvent.PING_RESOURCE, self._handler_ping_resource)
self._fsm.add_handler(ResourceAgentState.STOPPED, ResourceAgentEvent.LOST_CONNECTION, self._handler_connection_lost_driver_event)
# COMMAND state event handlers.
self._fsm.add_handler(ResourceAgentState.COMMAND, ResourceAgentEvent.RESET, self._handler_command_reset)
self._fsm.add_handler(ResourceAgentState.COMMAND, ResourceAgentEvent.GO_INACTIVE, self._handler_command_go_inactive)
self._fsm.add_handler(ResourceAgentState.COMMAND, ResourceAgentEvent.CLEAR, self._handler_command_clear)
self._fsm.add_handler(ResourceAgentState.COMMAND, ResourceAgentEvent.PAUSE, self._handler_command_pause)
self._fsm.add_handler(ResourceAgentState.COMMAND, ResourceAgentEvent.GET_RESOURCE, self._handler_get_resource)
self._fsm.add_handler(ResourceAgentState.COMMAND, ResourceAgentEvent.SET_RESOURCE, self._handler_set_resource)
self._fsm.add_handler(ResourceAgentState.COMMAND, ResourceAgentEvent.EXECUTE_RESOURCE, self._handler_execute_resource)
self._fsm.add_handler(ResourceAgentState.COMMAND, ResourceAgentEvent.GO_DIRECT_ACCESS, self._handler_command_go_direct_access)
self._fsm.add_handler(ResourceAgentState.COMMAND, ResourceAgentEvent.GET_RESOURCE_CAPABILITIES, self._handler_get_resource_capabilities)
self._fsm.add_handler(ResourceAgentState.COMMAND, ResourceAgentEvent.GET_RESOURCE_STATE, self._handler_get_resource_state)
self._fsm.add_handler(ResourceAgentState.COMMAND, ResourceAgentEvent.PING_RESOURCE, self._handler_ping_resource)
self._fsm.add_handler(ResourceAgentState.COMMAND, ResourceAgentEvent.LOST_CONNECTION, self._handler_connection_lost_driver_event)
# STREAMING state event handlers.
self._fsm.add_handler(ResourceAgentState.STREAMING, ResourceAgentEvent.RESET, self._handler_streaming_reset)
self._fsm.add_handler(ResourceAgentState.STREAMING, ResourceAgentEvent.GO_INACTIVE, self._handler_streaming_go_inactive)
self._fsm.add_handler(ResourceAgentState.STREAMING, ResourceAgentEvent.GET_RESOURCE, self._handler_get_resource)
self._fsm.add_handler(ResourceAgentState.STREAMING, ResourceAgentEvent.EXECUTE_RESOURCE, self._handler_execute_resource)
self._fsm.add_handler(ResourceAgentState.STREAMING, ResourceAgentEvent.GET_RESOURCE_CAPABILITIES, self._handler_get_resource_capabilities)
self._fsm.add_handler(ResourceAgentState.STREAMING, ResourceAgentEvent.GET_RESOURCE_STATE, self._handler_get_resource_state)
self._fsm.add_handler(ResourceAgentState.STREAMING, ResourceAgentEvent.PING_RESOURCE, self._handler_ping_resource)
self._fsm.add_handler(ResourceAgentState.STREAMING, ResourceAgentEvent.LOST_CONNECTION, self._handler_connection_lost_driver_event)
# TEST state event handlers.
self._fsm.add_handler(ResourceAgentState.TEST, ResourceAgentEvent.GET_RESOURCE, self._handler_get_resource)