-
Notifications
You must be signed in to change notification settings - Fork 3
/
job.py
120 lines (91 loc) · 3.62 KB
/
job.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
# Copyright 2016: Mirantis Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import functools
import importlib
import json
import logging
import time
import requests
import schedule
from health import config
from health.drivers import utils
from health import storage
LOGGING_FORMAT = '[%(asctime)s] %(levelname)s in %(module)s: %(message)s'
logging.basicConfig(format=LOGGING_FORMAT, level=logging.INFO)
def _get_driver(driver_type):
try:
return importlib.import_module("." + driver_type + ".driver",
"health.drivers").Driver
except ImportError:
logging.error("Could not load driver for '{}'".format(driver_type))
raise
def ignore_exceptions(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as e:
logging.error("Caught {} while running '{}' function".format(
e, func.__name__))
return wrapper
@ignore_exceptions
def job():
CONF = config.get_config()
started_at = time.time()
logging.info("Starting Syncing Job")
for src in CONF["sources"]:
backend_url = "%s/ms_health_%s/service" % (
CONF["backend"]["elastic"], src["region"])
min_ts, max_ts = utils.get_min_max_timestamps(backend_url, "timestamp")
driver = _get_driver(src["driver"]["type"])(src["driver"])
data_generator = driver.fetch(latest_aggregated_ts=max_ts)
logging.info("Start syncing %s region" % src["region"])
for i, data_interval in enumerate(data_generator):
if not data_interval:
logging.info("Chunk %s from region %s is already synced."
% (i, src["region"]))
continue
req_data = []
for d in data_interval:
d["region"] = src["region"]
# TODO(boris-42): Data is validated only by ES, which is bad
req_data.append('{"index": {}}')
req_data.append(json.dumps(d))
req_data = "\n".join(req_data)
logging.info("Sending data from chunk {} to backend".format(i))
try:
r = requests.post("%s/_bulk" % backend_url, data=req_data)
except requests.exceptions.RequestException:
logging.error("Was unable to store data for {} "
"Stopping current job run".format(
data_interval))
break
logging.debug(r.text)
logging.info("Syncing job completed in %.3f seconds"
% (time.time() - started_at))
def main():
CONF = config.get_config()
# Init Elastic index in backend
for src in CONF["sources"]:
storage.ensure_index_exists(src["region"])
# Setup periodic job that does aggregation magic
run_every_min = CONF.get("config", {}).get("run_every_minutes", 1)
schedule.every(run_every_min).minutes.do(job)
job()
while True:
schedule.run_pending()
time.sleep(1)
if __name__ == "__main__":
main()