-
Notifications
You must be signed in to change notification settings - Fork 2
/
acquisition_trigger_plugin.py
141 lines (113 loc) · 6.01 KB
/
acquisition_trigger_plugin.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
"""
This module provides the acquisition trigger plugin whose job it is to look at interesting events and query for
raw data when interesting events occur.
"""
import multiprocessing
import time
import typing
import zmq
import config
import plugins.base_plugin
import protobuf.opq_pb2
import protobuf.util
class AcquisitionTriggerPlugin(plugins.base_plugin.MaukaPlugin):
"""
This class provides the acquisition trigger plugin whose job it is to look at interesting events and query for
raw data when interesting events occur
This class subscribes to voltage and frequency event topics
"""
NAME = "AcquisitionTriggerPlugin"
def __init__(self, conf: config.MaukaConfig, exit_event: multiprocessing.Event):
""" Initializes this plugin
:param conf: Configuration dictionary
"""
super().__init__(conf, ["VoltageEvent", "FrequencyEvent", "OtherEvent"], AcquisitionTriggerPlugin.NAME,
exit_event)
self.zmq_req_ctx = zmq.Context()
"""ZeroMQ context"""
# noinspection PyUnresolvedReferences
# pylint: disable=E1101
self.push_socket = self.zmq_req_ctx.socket(zmq.PUSH)
"""ZeroMQ request socket"""
self.ms_before = int(self.config.get("plugins.AcquisitionTriggerPlugin.msBefore"))
"""Number of ms before an event that we should also request data"""
self.ms_after = int(self.config.get("plugins.AcquisitionTriggerPlugin.msAfter"))
"""Number of ms after an event that we should also request data"""
self.s_dead_zone = int(self.config.get("plugins.AcquisitionTriggerPlugin.sDeadZoneAfterTrigger"))
"""Number of seconds of deadzone that we should not request raw data after just requesting data"""
self.event_type_to_last_timestamp = {}
"""Store event types to the last event timestamp of that type"""
self.event_type_to_last_event = {}
"""Store event types to the last event of that type"""
self.push_socket.connect(self.config.get("zmq.makai.push.interface"))
def request_event_message(self, start_ms: int, end_ms: int, trigger_type: str, percent_magnitude: float,
box_ids: typing.List[int], requestee: str, description: str, request_data: bool):
""" Creates a new protobuf serialized event request message
:param start_ms: Start time in ms since the epoch
:param end_ms: End time in ms since the epoch
:param trigger_type: Why are we requesting data
:param percent_magnitude: The percentage of the power value from steady state
:param box_ids: List of box ids included in event
:param requestee: The plugin or analysis that is requesting data
:param description: Human description filled in by plugin
:param request_data: Whether or not data should actually be requested
:return: A serialized event message
"""
msg = protobuf.opq_pb2.RequestEventMessage()
msg.start_timestamp_ms_utc = start_ms - self.ms_before
msg.end_timestamp_ms_utc = end_ms + self.ms_after
msg.trigger_type = trigger_type
msg.percent_magnitude = percent_magnitude
# pylint: disable=E1101
msg.box_ids.extend(box_ids)
msg.requestee = requestee
msg.description = description
msg.request_data = request_data
return msg.SerializeToString()
def is_deadzone(self, event_type, now) -> bool:
"""Determines if we are currently in a deadzone
:param event_type: The current event type
:param now: The current time
:return: If we are currently in a deadzone or not
"""
if event_type in self.event_type_to_last_timestamp:
return now - self.event_type_to_last_timestamp[event_type] <= self.s_dead_zone
return False
def get_status(self):
# return str(self.event_type_to_last_event_timestamp)
return "{} {}".format(self.event_type_to_last_timestamp, self.event_type_to_last_event)
def on_message(self, topic, mauka_message):
"""Subscribed messages appear here
:param topic: The topic that this message is associated with
:param mauka_message: The message
"""
if protobuf.util.is_makai_trigger(mauka_message):
self.debug("on_message {}".format(mauka_message))
event_type = mauka_message.makai_trigger.event_type
now = time.time()
if self.is_deadzone(event_type, now):
self.debug("In deadzone")
request_data = False
else:
self.debug("Not in deadzone")
request_data = True
self.event_type_to_last_timestamp[event_type] = now
start_ts_ms_utc = mauka_message.makai_trigger.event_start_timestamp_ms
end_ts_ms_utc = mauka_message.makai_trigger.event_end_timestamp_ms
# pylint: disable=E1101
trigger_type = protobuf.opq_pb2.RequestEventMessage.TriggerType.Value(event_type)
device_ids = list(map(int, [mauka_message.makai_trigger.box_id]))
event_msg = self.request_event_message(start_ts_ms_utc, end_ts_ms_utc, trigger_type,
mauka_message.makai_trigger.max_value,
device_ids, "AcquisitionTriggerPlugin",
"{} {}-{}".format(str(trigger_type), start_ts_ms_utc, end_ts_ms_utc),
request_data)
self.event_type_to_last_event[event_type] = str(event_msg)
try:
self.debug("Sending event msg: {}".format(event_msg))
self.push_socket.send(event_msg)
except zmq.ZMQError as err:
self.logger.error("Error sending req to Makai: %s", str(err))
else:
self.logger.error("Received incorrect mauka message [%s] in AcquisitionTriggerPlugin",
protobuf.util.which_message_oneof(mauka_message))