-
Notifications
You must be signed in to change notification settings - Fork 8
/
managers.py
258 lines (212 loc) · 9.2 KB
/
managers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
# -*- coding: utf-8 -*-
import hashlib
import psutil
from amplify.agent.common.context import context
from amplify.agent.common.util import subp
from amplify.agent.managers.abstract import launch_method_supported
from amplify.agent.data.eventd import INFO
from amplify.ext.abstract.manager import ExtObjectManager
from amplify.ext.mysql.util import PS_CMD, master_parser, ps_parser
from amplify.ext.mysql import AMPLIFY_EXT_KEY
from amplify.agent.common.util.configtypes import boolean
from amplify.ext.mysql.objects import MySQLObject
__author__ = "Andrew Alexeev"
__copyright__ = "Copyright (C) Nginx Inc. All rights reserved."
__license__ = ""
__maintainer__ = "Mike Belov"
__email__ = "dedm@nginx.com"
class MySQLManager(ExtObjectManager):
"""
Manager for MySQL objects.
"""
ext = AMPLIFY_EXT_KEY
name = 'mysql_manager'
type = 'mysql'
types = ('mysql',)
def _discover_objects(self):
# save the current ids
existing_hashes = [
obj.definition_hash
for obj in self.objects.find_all(types=self.types)
]
discovered_hashes = []
if boolean(context.app_config['mysql'].get('remote', False)):
mysql_daemons = self._find_remote()
else:
mysql_daemons = self._find_local()
while len(mysql_daemons):
try:
data = mysql_daemons.pop()
definition = {
'type': 'mysql',
'local_id': data['local_id'],
'root_uuid': context.uuid
}
definition_hash = MySQLObject.hash(definition)
discovered_hashes.append(definition_hash)
if definition_hash not in existing_hashes:
# New object -- create it
new_obj = MySQLObject(data=data)
# Send discover event.
new_obj.eventd.event(
level=INFO,
message='mysqld process found, pid %s' % new_obj.pid
)
self.objects.register(new_obj, parent_id=self.objects.root_id)
elif definition_hash in existing_hashes:
for obj in self.objects.find_all(types=self.types):
if obj.definition_hash == definition_hash:
current_obj = obj
break
if current_obj.pid != data['pid']:
# PIDs changed... MySQL must have been restarted
context.log.debug(
'mysqld was restarted (pid was %s now %s)' % (
current_obj.pid, data['pid']
)
)
new_obj = MySQLObject(data=data)
# send MySQL restart event
new_obj.eventd.event(
level=INFO,
message='mysqld process was restarted, new pid %s, old pid %s' % (
new_obj.pid,
current_obj.pid
)
)
# stop and un-register children
children_objects = self.objects.find_all(
obj_id=current_obj.id,
children=True,
include_self=False
)
for child_obj in children_objects:
child_obj.stop()
self.objects.unregister(obj=child_obj)
# un-register old object
self.objects.unregister(current_obj)
# stop old object
current_obj.stop()
self.objects.register(new_obj, parent_id=self.objects.root_id)
except psutil.NoSuchProcess:
context.log.debug('mysqld is restarting/reloading, pids are changing, agent is waiting')
# check if we left something in objects (MySQL could be stopped or something)
dropped_hashes = filter(lambda x: x not in discovered_hashes, existing_hashes)
if len(dropped_hashes):
for dropped_hash in dropped_hashes:
for obj in self.objects.find_all(types=self.types):
if obj.definition_hash == dropped_hash:
dropped_obj = obj
break
context.log.debug('mysqld was stopped (pid was %s)' % dropped_obj.pid)
# stop and un-register children
children_objects = self.objects.find_all(
obj_id=dropped_obj.id,
children=True,
include_self=False
)
for child_obj in children_objects:
child_obj.stop()
self.objects.unregister(child_obj)
dropped_obj.stop()
self.objects.unregister(dropped_obj)
@staticmethod
def _find_local(ps=None):
"""
Tries to find all mysqld processes
:param ps: [] of str, used for debugging our parsing logic - should be None most of the time
:return: [] of {} MySQL object definitions
"""
# get ps info
try:
# set ps output to passed param or call subp
ps, _ = (ps, None) if ps is not None else subp.call(PS_CMD)
context.log.debug('ps mysqld output: %s' % ps)
except Exception as e:
# log error
exception_name = e.__class__.__name__
context.log.debug(
'failed to find running mysqld via "%s" due to %s' % (
PS_CMD, exception_name
)
)
context.log.debug('additional info:', exc_info=True)
# If there is a root_object defined, log an event to send to the
# cloud.
if context.objects.root_object:
context.objects.root_object.eventd.event(
level=INFO,
message='no mysqld processes found'
)
# break processing returning a fault-tolerant empty list
return []
if not any('mysqld' in line for line in ps):
context.log.info('no mysqld processes found')
# break processing returning a fault-tolerant empty list
return []
# collect all info about processes
masters = {}
try:
for line in ps:
parsed = ps_parser(line)
# if not parsed - go to the next line
if parsed is None:
continue
pid, ppid, cmd = parsed # unpack values
# match master process
if cmd.split(' ', 1)[0].endswith('mysqld'):
if not launch_method_supported("mysql", ppid):
continue
try:
conf_path = master_parser(cmd)
except Exception as e:
context.log.error('failed to find conf_path for %s' % cmd)
context.log.debug('additional info:', exc_info=True)
else:
# calculate local_id
local_id = hashlib.sha256('%s_%s' % (cmd, conf_path)).hexdigest()
if pid not in masters:
masters[pid] = {}
masters[pid].update({
'cmd': cmd.strip(),
'conf_path': conf_path,
'pid': pid,
'local_id': local_id
})
except Exception as e:
# log error
exception_name = e.__class__.__name__
context.log.error('failed to parse ps results due to %s' % exception_name)
context.log.debug('additional info:', exc_info=True)
# format results
results = []
for payload in masters.itervalues():
# only add payloads that have all the keys
if 'cmd' in payload and 'conf_path' in payload and 'pid' in payload and 'local_id' in payload:
results.append(payload)
else:
context.log.debug('MySQL "_find_all()" found an incomplete entity %s' % payload)
return results
@staticmethod
def _find_remote():
"""
:return: [] of {} MySQL object definition for remote mysqld process
"""
results = []
try:
cmd = "/usr/sbin/mysqld"
conf_path = "/etc/mysql/my.cnf"
# calculate local_id
local_id = hashlib.sha256('%s_%s' % (cmd, conf_path)).hexdigest()
results.append({
'cmd': 'unknown',
'conf_path': 'unknown',
'pid': 'unknown',
'local_id': local_id
})
except Exception as e:
# log error
exception_name = e.__class__.__name__
context.log.error('failed to parse remote mysql results due to %s' % exception_name)
context.log.debug('additional info:', exc_info=True)
return results