-
Notifications
You must be signed in to change notification settings - Fork 46
/
high_availability_agent.py
185 lines (138 loc) · 6.49 KB
/
high_availability_agent.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
from pyon.agent.simple_agent import SimpleResourceAgent
from pyon.event.event import EventPublisher
from pyon.public import log
from interface.objects import AgentCommand, ProcessDefinition, ProcessSchedule, ProcessStateEnum
from interface.services.cei.iprocess_dispatcher_service import ProcessDispatcherServiceClient
from ion.agents.cei.util import looping_call
from ion.services.cei.process_dispatcher_service import _core_process_definition_from_ion
try:
from epu.highavailability.core import HighAvailabilityCore
import epu.highavailability.policy as policy
except ImportError:
HighAvailabilityCore = None
#raise
"""
@package ion.agents.cei.high_availability_agent
@file ion/agents/cei/exehigh_availability_agent
@author Patrick Armstrong
@brief Pyon port of HAAgent
"""
DEFAULT_INTERVAL = 5
class HighAvailabilityAgent(SimpleResourceAgent):
"""Agent to manage high availability processes
"""
def __init__(self):
log.debug("HighAvailabilityAgent init")
SimpleResourceAgent.__init__(self)
def on_init(self):
if not HighAvailabilityCore:
msg = "HighAvailabilityCore isn't available. Use autolaunch.cfg buildout"
log.error(msg)
return
log.debug("HighAvailabilityCore Pyon on_init")
policy_name = self.CFG.get_safe("highavailability.policy.name")
if policy_name is None:
msg = "HA service requires a policy name at CFG.highavailability.policy.name"
raise Exception(msg)
try:
self.policy = policy.policy_map[policy_name.lower()]
except KeyError:
raise Exception("HA Service doesn't support '%s' policy" % policy_name)
policy_parameters = self.CFG.get_safe("highavailability.policy.parameters")
self.policy_interval = self.CFG.get_safe("highavailability.policy.interval",
DEFAULT_INTERVAL)
cfg = self.CFG.get_safe("highavailability")
pds = self.CFG.get_safe("highavailability.process_dispatchers", [])
process_definition_id = self.CFG.get_safe("highavailability.process_definition_id")
process_configuration = self.CFG.get_safe("highavailability.process_configuration")
aggregator_config = self.CFG.get_safe("highavailability.aggregator")
# TODO: Allow other core class?
self.core = HighAvailabilityCore(cfg, ProcessDispatcherSimpleAPIClient,
pds, self.policy, process_definition_id=process_definition_id,
parameters=policy_parameters,
process_configuration=process_configuration,
aggregator_config=aggregator_config)
self.policy_thread = looping_call(self.policy_interval, self.core.apply_policy)
def on_quit(self):
self.policy_thread.kill(block=True, timeout=3)
def rcmd_reconfigure_policy(self, new_policy):
"""Service operation: Change the parameters of the policy used for service
@param new_policy: parameters of policy
@return:
"""
self.core.reconfigure_policy(new_policy)
def rcmd_status(self):
"""Service operation: Get the status of the HA Service
@return: {PENDING, READY, STEADY, BROKEN}
"""
return self.core.status()
def rcmd_dump(self):
return self.core.dump()
class HighAvailabilityAgentClient(object):
def __init__(self, agent_client):
self.client = agent_client
def status(self):
args = []
cmd = AgentCommand(command='status', args=args)
return self.client.execute(cmd)
def reconfigure_policy(self, new_policy):
args = [new_policy]
cmd = AgentCommand(command='reconfigure_policy', args=args)
return self.client.execute(cmd)
def dump(self):
args = []
cmd = AgentCommand(command='dump', args=args)
return self.client.execute(cmd)
class ProcessDispatcherSimpleAPIClient(object):
# State to use when state returned from PD is None
unknown_state = "400-PENDING"
state_map = {
ProcessStateEnum.SPAWN: '500-RUNNING',
ProcessStateEnum.TERMINATE: '700-TERMINATED',
ProcessStateEnum.ERROR: '850-FAILED'
}
def __init__(self, name, real_client=None, **kwargs):
if real_client is not None:
self.real_client = real_client
else:
self.real_client = ProcessDispatcherServiceClient(to_name=name, **kwargs)
self.event_pub = EventPublisher()
def create_definition(self, definition_id, definition_type, executable,
name=None, description=None):
# note: we lose the description
definition = ProcessDefinition(name=name)
definition.executable = {'module': executable.get('module'),
'class': executable.get('class')}
definition.definition_type = definition_type
return self.real_client.create_process_definition(definition, definition_id)
def describe_definition(self, definition_id):
definition = self.real_client.read_process_definition(definition_id)
core_defintion = _core_process_definition_from_ion(definition)
return core_defintion
def schedule_process(self, upid, definition_id, configuration=None,
subscribers=None, constraints=None, queueing_mode=None,
restart_mode=None, execution_engine_id=None, node_exclusive=None):
definition = self.real_client.read_process_definition(definition_id)
self.event_pub.publish_event(event_type="ProcessLifecycleEvent",
origin=definition.name, origin_type="DispatchedHAProcess",
state=ProcessStateEnum.SPAWN)
pid = self.real_client.create_process(definition_id)
process_schedule = ProcessSchedule()
sched_pid = self.real_client.schedule_process(definition_id,
process_schedule, configuration=configuration, process_id=upid)
proc = self.real_client.read_process(sched_pid)
dict_proc = {'upid': proc.process_id,
'state': self.state_map.get(proc.process_state, self.unknown_state),
}
return dict_proc
def terminate_process(self, pid):
return self.real_client.cancel_process(pid)
def describe_processes(self):
procs = self.real_client.list_processes()
dict_procs = []
for proc in procs:
dict_proc = {'upid': proc.process_id,
'state': self.state_map.get(proc.process_state, self.unknown_state),
}
dict_procs.append(dict_proc)
return dict_procs