Skip to content

Commit

Permalink
Merge pull request #788 from MauriceManning/pubaggalert
Browse files Browse the repository at this point in the history
add logic to publish aggregate alert events
  • Loading branch information
MauriceManning committed Mar 27, 2013
2 parents 0315557 + 2030dab commit 2ba2fb8
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 19 deletions.
42 changes: 36 additions & 6 deletions ion/agents/instrument/instrument_agent.py
Expand Up @@ -58,7 +58,7 @@

# MI imports
from ion.core.includes.mi import DriverAsyncEvent
from interface.objects import StreamRoute
from interface.objects import StreamRoute, StreamAlertType
from interface.objects import AgentCommand, StatusType, DeviceStatusEnum, AggregateStatusType

class InstrumentAgentState():
Expand Down Expand Up @@ -817,27 +817,57 @@ def _process_aggregate_alerts(self):
loop thru alerts list and retrieve status of any alert that contributes to the aggregate status and update the state
"""
#init working status
updated_status = {}
for aggregate_type in AggregateStatusType._str_map.keys():
if aggregate_type is not AggregateStatusType.AGGREGATE_OTHER:
self.aparam_aggstatus[aggregate_type] = DeviceStatusEnum.STATUS_UNKNOWN
updated_status[aggregate_type] = DeviceStatusEnum.STATUS_UNKNOWN

for a in self.aparam_alerts:
log.debug('_process_aggregate_alerts a: %s', a)
curr_state = a.get_status()
if a._aggregate_type is not AggregateStatusType.AGGREGATE_OTHER:
#get the current value for this aggregate status
current_agg_state = self.aparam_aggstatus[ a._aggregate_type ]
current_agg_state = updated_status[ a._aggregate_type ]
if a._status:
# this alert is not 'tripped' so the status is OK
#check behavior here. if there are any unknowns then set to agg satus to unknown?
log.debug('_process_aggregate_alerts Clear')
if current_agg_state is DeviceStatusEnum.STATUS_UNKNOWN:
self.aparam_aggstatus[ a._aggregate_type ] = DeviceStatusEnum.STATUS_OK
updated_status[ a._aggregate_type ] = DeviceStatusEnum.STATUS_OK

else:
#the alert is active, either a warning or an alarm
if a._alert_type is StreamAlertType.ALARM:
self.aparam_aggstatus[ a._aggregate_type ] = DeviceStatusEnum.STATUS_CRITICAL
log.debug('_process_aggregate_alerts Critical')
updated_status[ a._aggregate_type ] = DeviceStatusEnum.STATUS_CRITICAL
elif a._alert_type is StreamAlertType.WARNING and current_agg_state is not DeviceStatusEnum.STATUS_CRITICAL:
self.aparam_aggstatus[ a._aggregate_type ] = DeviceStatusEnum.STATUS_WARNING
log.debug('_process_aggregate_alerts Warn')
updated_status[ a._aggregate_type ] = DeviceStatusEnum.STATUS_WARNING

#compare old state with new state and publish alerts for any agg status that has changed.
for aggregate_type in AggregateStatusType._str_map.keys():
if aggregate_type is not AggregateStatusType.AGGREGATE_OTHER:
if updated_status[aggregate_type] != self.aparam_aggstatus[aggregate_type]:
log.debug('_process_aggregate_alerts pubevent')
self._publish_agg_status_event(aggregate_type, updated_status[aggregate_type],self.aparam_aggstatus[aggregate_type])
self.aparam_aggstatus[aggregate_type] = updated_status[aggregate_type]

return

def _publish_agg_status_event(self, status_type, new_status, old_status):
# Publish resource config change event.
try:
self._event_publisher.publish_event(
event_type='DeviceAggregateStatusEvent',
origin_type=self.ORIGIN_TYPE,
origin=self.resource_id,
status_name=status_type,
status=new_status,
prev_status=old_status)
except:
log.error('Instrument agent %s could not publish aggregate status change event.',
self._proc_name)

return


Expand Down
71 changes: 58 additions & 13 deletions ion/services/sa/test/test_activate_instrument.py
Expand Up @@ -297,7 +297,7 @@ def test_activateInstrumentSample(self):
inteval.
"""

alert_def1 = {
temp_alert_def = {
'name' : 'temperature_warning_interval',
'stream_name' : 'parsed',
'message' : 'Temperature is below the normal range of 50.0 and above.',
Expand All @@ -311,6 +311,36 @@ def test_activateInstrumentSample(self):
'alert_class' : 'IntervalAlert'
}

pressure_alert_def = {
'name' : 'pressure_warning_interval',
'stream_name' : 'parsed',
'message' : 'Pressure is below the normal range of 50.0 and above.',
'alert_type' : StreamAlertType.WARNING,
'aggregate_type' : AggregateStatusType.AGGREGATE_DATA,
'value_id' : 'pressure',
'resource_id' : instDevice_id,
'origin_type' : 'device',
'lower_bound' : 50.0,
'lower_rel_op' : '<',
'alert_class' : 'IntervalAlert'
}

late_data_alert_def = {
'name' : 'late_data_warning',
'stream_name' : 'parsed',
'message' : 'Expected data has not arrived.',
'alert_type' : StreamAlertType.WARNING,
'aggregate_type' : AggregateStatusType.AGGREGATE_COMMS,
'value_id' : None,
'resource_id' : instDevice_id,
'origin_type' : 'device',
'time_delta' : 2,
'get_state' : ResourceAgentState.STREAMING,
'alert_class' : 'LateDataAlert'
}




port_agent_config = {
'device_addr': CFG.device.sbe37.host,
Expand All @@ -327,7 +357,7 @@ def test_activateInstrumentSample(self):
instAgentInstance_obj = IonObject(RT.InstrumentAgentInstance, name='SBE37IMAgentInstance',
description="SBE37IMAgentInstance",
port_agent_config = port_agent_config,
alerts= [alert_def1])
alerts= [temp_alert_def, late_data_alert_def])


instAgentInstance_id = self.imsclient.create_instrument_agent_instance(instAgentInstance_obj,
Expand Down Expand Up @@ -423,32 +453,48 @@ def start_instrument_agent():

#setup a subscriber to alarm events from the device
self._events_received= []
self._event_count = 0
self._samples_out_of_range = 0
self._samples_complete = False
self._async_sample_result = AsyncResult()
self._agg_event_recieved = False
self._alert_event_recieved = False

def consume_event(*args, **kwargs):
log.debug('TestActivateInstrument recieved ION event: args=%s, kwargs=%s, event=%s.',
str(args), str(kwargs), str(args[0]))
self._events_received.append(args[0])
self._event_count = len(self._events_received)
self._async_sample_result.set()
retval = self._ia_client.get_agent(['aggstatus'])['aggstatus']
log.debug('TestActivateInstrument consume_event aggStatus: %s', retval)

event = args[0]
log.debug('TestActivateInstrument consume_event event: %s', event)

if event.type_ is 'StreamAlertEvent':
self._alert_event_recieved = True
elif event.type_ is 'DeviceAggregateStatusEvent':
self._agg_event_recieved = True

#check that the current agg status matches the event sub_type
retval = self._ia_client.get_agent(['aggstatus'])['aggstatus']
log.debug('TestActivateInstrument consume_event aggStatus: %s', retval)
if event.sub_type == 'WARNING':
self.assertEqual(retval[AggregateStatusType.AGGREGATE_DATA], DeviceStatusEnum.STATUS_WARNING)
elif event.sub_type == 'ALL_CLEAR':
self.assertEqual(retval[AggregateStatusType.AGGREGATE_DATA], DeviceStatusEnum.STATUS_OK)

#once the alert is recived and the aggregate status is also received then finish
if self._agg_event_recieved and self._alert_event_recieved:
self._async_sample_result.set()


self._event_subscriber = EventSubscriber(
event_type= 'StreamAlertEvent',
callback=consume_event,
origin=instDevice_id)
self._event_subscriber.start()

self._event_subscriber = EventSubscriber(
event_type= 'DeviceAggregateStatusEvent',
callback=consume_event,
origin=instDevice_id)
self._event_subscriber.start()


#cleanup
self.addCleanup(self.imsclient.stop_instrument_agent_instance,
Expand Down Expand Up @@ -532,7 +578,6 @@ def stop_subscriber():
reply = self._ia_client.execute_agent(cmd)
log.debug("test_activateInstrumentSample: return from reset %s" , str(reply))

self._samples_complete = True

#--------------------------------------------------------------------------------
# Now get the data in one chunk using an RPC Call to start_retreive
Expand All @@ -543,19 +588,19 @@ def stop_subscriber():
rdt = RecordDictionaryTool.load_from_granule(replay_data)
log.debug("test_activateInstrumentSample: RDT parsed: %s", str(rdt.pretty_print()) )
temp_vals = rdt['temp']
pressure_vals = rdt['pressure']
self.assertEquals(len(temp_vals) , 10)
log.debug("test_activateInstrumentSample: all temp_vals: %s", temp_vals )
log.debug("test_activateInstrumentSample: all pressure_vals: %s", pressure_vals )

out_of_range_temp_vals = [i for i in temp_vals if i < 50.0]
log.debug("test_activateInstrumentSample: Out_of_range_temp_vals: %s", out_of_range_temp_vals )
self._samples_out_of_range = len(out_of_range_temp_vals)

# if no bad values were produced, then do not wait for an event
if self._samples_out_of_range == 0:
if len(out_of_range_temp_vals) == 0:
self._async_sample_result.set()

log.debug("test_activateInstrumentSample: _events_received: %s", self._events_received )
log.debug("test_activateInstrumentSample: _event_count: %s", self._event_count )

self._async_sample_result.get(timeout=CFG.endpoint.receive.timeout)

Expand Down

0 comments on commit 2ba2fb8

Please sign in to comment.