/
osvcd_collector.py
178 lines (158 loc) · 6.46 KB
/
osvcd_collector.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
"""
Collector Thread
"""
import os
import sys
import logging
import datetime
import osvcd_shared as shared
import rcExceptions as ex
from comm import Crypt
from rcGlobalEnv import rcEnv
class Collector(shared.OsvcThread, Crypt):
update_interval = 300
ping_interval = 60
def run(self):
self.log = logging.getLogger(rcEnv.nodename+".osvcd.collector")
self.log.info("collector started")
self.last_comm = None
self.last_config = {}
self.last_status = {}
while True:
if self.stopped():
sys.exit(0)
try:
self.do()
except Exception as exc:
self.log.exception(exc)
def get_last_status(self, data):
last_status = {}
last_status_changed = []
for nodename, ndata in data["nodes"].items():
for svcname, sdata in ndata.get("services", {}).get("status", {}).items():
status_csum = sdata.get("csum")
prev_status_csum = self.last_status.get((svcname, nodename))
if status_csum != prev_status_csum:
last_status_changed.append(svcname+"@"+nodename)
if svcname not in last_status_changed:
last_status_changed.append(svcname)
last_status[(svcname, nodename)] = status_csum
return last_status, last_status_changed
def get_last_config(self, data):
last_config = {}
last_config_changed = []
for svcname, sdata in data["nodes"].get(rcEnv.nodename, {}).get("services", {}).get("config", {}).items():
config_csum = sdata.get("csum")
prev_config_csum = self.last_config.get(svcname)
if config_csum != prev_config_csum:
last_config_changed.append(svcname)
last_config[svcname] = config_csum
return last_config, last_config_changed
def init_collector(self):
if " 127.0.0.1/" in repr(shared.NODE.collector.proxy):
shared.NODE.collector.init()
def do(self):
self.reload_config()
self.init_collector()
self.run_collector()
self.unqueue_xmlrpc()
with shared.COLLECTOR_TICKER:
shared.COLLECTOR_TICKER.wait(self.update_interval)
def unqueue_xmlrpc(self):
while True:
try:
args, kwargs = shared.COLLECTOR_XMLRPC_QUEUE.pop()
except IndexError:
break
if len(args) == 0:
continue
try:
#self.log.info("call %s", args[0])
shared.NODE.collector.call(*args, **kwargs)
except Exception as exc:
self.log.exception(exc)
def send_containerinfo(self, svcname):
if svcname not in shared.SERVICES:
return
if not shared.SERVICES[svcname].has_encap_resources:
return
self.log.info("send service %s container info", svcname)
with shared.SERVICES_LOCK:
shared.NODE.collector.call("push_containerinfo", shared.SERVICES[svcname])
def send_service_config(self, svcname):
if svcname not in shared.SERVICES:
return
self.log.info("send service %s config", svcname)
with shared.SERVICES_LOCK:
shared.NODE.collector.call("push_config", shared.SERVICES[svcname])
def send_daemon_status(self, data, last_status_changed=None):
if last_status_changed:
self.log.info("send daemon status, changed: %s", ", ".join(last_status_changed))
else:
self.log.info("send daemon status, resync")
shared.NODE.collector.call("push_daemon_status", data, last_status_changed)
self.last_comm = datetime.datetime.utcnow()
def ping(self):
self.log.info("ping the collector")
shared.NODE.collector.call("daemon_ping")
self.last_comm = datetime.datetime.utcnow()
def get_data(self):
"""
Get a copy of the monitor thread, expunged from encap services,
to avoid missing changes happening during our work
"""
if "monitor" not in shared.THREADS:
# the monitor thread is not started
return
data = shared.THREADS["monitor"].status()
_data = {
"nodes": {},
"services": {},
}
for key in data:
if key not in _data:
_data[key] = data[key]
for nodename in data["nodes"]:
try:
instances_status = data["nodes"][nodename]["services"]["status"]
instances_config = data["nodes"][nodename]["services"]["config"]
except KeyError:
continue
for svcname in list(instances_status.keys()):
if svcname not in instances_config:
# deleted service instance
continue
if instances_status[svcname].get("encap") is True:
continue
if nodename not in _data["nodes"]:
_data["nodes"][nodename] = {
"services": {
"config": {},
"status": {},
},
}
_data["nodes"][nodename]["services"]["status"][svcname] = instances_status[svcname]
_data["nodes"][nodename]["services"]["config"][svcname] = instances_config[svcname]
_data["services"][svcname] = data["services"][svcname]
return _data
def run_collector(self):
data = self.get_data()
if data is None:
return
if len(data["services"]) == 0:
#self.log.debug("no service")
return
last_config, last_config_changed = self.get_last_config(data)
for svcname in last_config_changed:
self.send_service_config(svcname)
self.send_containerinfo(svcname)
self.last_config = last_config
if self.speaker():
last_status, last_status_changed = self.get_last_status(data)
if last_status_changed != [] or self.last_comm is None:
self.send_daemon_status(data, last_status_changed)
elif self.last_comm <= datetime.datetime.utcnow() - datetime.timedelta(seconds=self.ping_interval):
pass
elif self.last_comm <= datetime.datetime.utcnow() - datetime.timedelta(seconds=self.update_interval):
self.ping()
self.last_status = last_status