Skip to content

Commit

Permalink
Use oss_lib.config to dial with configuration
Browse files Browse the repository at this point in the history
  • Loading branch information
akscram committed Dec 23, 2016
1 parent 7c37620 commit 9443a80
Show file tree
Hide file tree
Showing 8 changed files with 162 additions and 166 deletions.
8 changes: 5 additions & 3 deletions health/api/v1/health_.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
import flask
import requests

from health import config
from oss_lib import config

CONF = config.CONF

health = flask.Blueprint("health", __name__)

Expand Down Expand Up @@ -118,7 +120,7 @@ def get_health(region, period):

# only match if region is not "all"

request = config.get_config()["backend"]["elastic"]
request = CONF["backend"]["elastic"]
r = requests.get("%s/ms_health_%s/_search" % (request, region),
data=json.dumps(query))

Expand Down Expand Up @@ -160,7 +162,7 @@ def get_overview(period):
query = get_query(
period, interval, aggs_name="regions", aggs_term="region")

request = config.get_config()["backend"]["elastic"]
request = CONF["backend"]["elastic"]
r = requests.get("%s/ms_health_*/_search" % request,
data=json.dumps(query))

Expand Down
5 changes: 3 additions & 2 deletions health/api/v1/regions.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@
import flask
import requests

from health import config
from oss_lib import config

CONF = config.CONF

regions = flask.Blueprint("regions", __name__)

Expand All @@ -26,7 +27,7 @@
def list_regions():
"""List region names."""
resp = requests.get("%s/ms_health_*/_mappings"
% config.get_config()["backend"]["elastic"])
% CONF["backend"]["elastic"])
if resp.ok:
return flask.jsonify([name[10:] for name in resp.json()])
else:
Expand Down
43 changes: 43 additions & 0 deletions health/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

import logging

import flask
from oss_lib import routing

from health.api.v1 import health_
from health.api.v1 import regions

LOG = logging.getLogger(__name__)

app = flask.Flask("health", static_folder=None)


@app.errorhandler(404)
def not_found(error):
LOG.warning("Could not find package: %s", error)
return flask.jsonify({"error": "Not Found"}), 404


@app.errorhandler(500)
def handle_500(error):
LOG.error("Unexpected error occured: %s", error)
return flask.jsonify({"error": "Internal Server Error"}), 500


for bp in [health_, regions]:
for url_prefix, blueprint in bp.get_blueprints():
app.register_blueprint(blueprint, url_prefix="/api/v1%s" % url_prefix)


app = routing.add_routing_map(app, html_uri=None, json_uri="/")
127 changes: 39 additions & 88 deletions health/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,110 +13,61 @@
# License for the specific language governing permissions and limitations
# under the License.

import json
import logging
import os
import sys

import jsonschema


CONF = None
DEFAULT_CONF = {
"flask": {
"HOST": "0.0.0.0",
"PORT": 5000,
"DEBUG": False
},
DEFAULT = {
"sources": [
{
"region": "region",
"driver": {
"type": "tcp",
"elastic_src": "http://127.0.0.1:9200/log-*/log"
}
}
"elastic_src": "http://127.0.0.1:9200/log-*/log",
},
},
],
"backend": {
"elastic": "http://127.0.0.1:9200/",
},
"config": {
"run_every_minutes": 2
}
"run_every_minutes": 2,
},
}

CONF_SCHEMA = {
"type": "object",
"$schema": "http://json-schema.org/draft-04/schema",
"properties": {
"flask": {
"type": "object"
},
"sources": {
"type": "array",
"items": {
"type": "object",
"properties": {
"region": {
"type": "string"
SCHEMA = {
"sources": {
"type": "array",
"items": {
"type": "object",
"properties": {
"region": {
"type": "string",
},
"driver": {
"type": "object",
"properties": {
"type": {"type": "string"},
"elastic_src": {"type": "string"},
},
"driver": {
"type": "object",
"properties": {
"type": {"type": "string"},
"elastic_src": {"type": "string"}
},
"required": ["type", "elastic_src"]
}
"required": ["type", "elastic_src"],
},
"required": ["region", "driver"]
}
},
"required": ["region", "driver"],
},
"backend": {
"type": "object",
"properties": {
"elastic": {
"type": "string"
}
},
"backend": {
"type": "object",
"properties": {
"elastic": {
"type": "string"
},
},
"required": ["elastic"],
},
"config": {
"type": "object",
"properties": {
"run_every_minutes": {
"type": "integer",
"minimum": 1,
},
"required": ["elastic"]
},
"config": {
"type": "object",
"properties": {
"run_every_minutes": {
"type": "integer",
"minimum": 1
}
}
}
},
"additionalProperties": False
}


def get_config():
"""Return cached configuration.
:returns: application config
:rtype: dict
"""
global CONF
if not CONF:
path = os.environ.get("HEALTH_CONF", "/etc/health/config.json")
try:
CONF = json.load(open(path))
logging.info("Config is '%s'" % path)
except IOError as e:
logging.warning("Config at '%s': %s" % (path, e))
CONF = DEFAULT_CONF
try:
jsonschema.validate(CONF, CONF_SCHEMA)
except jsonschema.ValidationError as e:
logging.error(e.message)
sys.exit(1)
except jsonschema.SchemaError as e:
logging.error(e)
sys.exit(1)
else:
return CONF
40 changes: 19 additions & 21 deletions health/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,24 @@
import logging
import time

from oss_lib import config
import requests
import schedule

from health import config
from health import config as cfg
from health.drivers import utils
from health.mapping import es


LOGGING_FORMAT = '[%(asctime)s] %(levelname)s in %(module)s: %(message)s'
logging.basicConfig(format=LOGGING_FORMAT, level=logging.INFO)
LOG = logging.getLogger(__name__)
CONF = config.CONF


def _get_driver(driver_type):
try:
return importlib.import_module("." + driver_type + ".driver",
"health.drivers").Driver
except ImportError:
logging.error("Could not load driver for '{}'".format(driver_type))
LOG.error("Could not load driver for '{}'".format(driver_type))
raise


Expand All @@ -47,17 +47,16 @@ def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as e:
logging.error("Caught {} while running '{}' function".format(
LOG.error("Caught {} while running '{}' function".format(
e, func.__name__))

return wrapper


@ignore_exceptions
def job():
CONF = config.get_config()
started_at = time.time()
logging.info("Starting Syncing Job")
LOG.info("Starting Syncing Job")

for src in CONF["sources"]:
backend_url = "%s/ms_health_%s/service" % (
Expand All @@ -67,13 +66,13 @@ def job():
driver = _get_driver(src["driver"]["type"])(src["driver"])
data_generator = driver.fetch(latest_aggregated_ts=max_ts)

logging.info("Start syncing %s region" % src["region"])
LOG.info("Start syncing %s region" % src["region"])

for i, data_interval in enumerate(data_generator):

if not data_interval:
logging.info("Chunk %s from region %s is already synced."
% (i, src["region"]))
LOG.info("Chunk %s from region %s is already synced.",
i, src["region"])
continue

req_data = []
Expand All @@ -83,38 +82,37 @@ def job():
req_data.append('{"index": {}}')
req_data.append(json.dumps(d))
req_data = "\n".join(req_data)
logging.info("Sending data from chunk {} to backend".format(i))
LOG.info("Sending data from chunk {} to backend".format(i))

try:
r = requests.post("%s/_bulk" % backend_url, data=req_data)
except requests.exceptions.RequestException:
logging.error("Was unable to store data for {} "
LOG.error("Was unable to store data for {} "
"Stopping current job run".format(
data_interval))
break
logging.debug(r.text)
LOG.debug(r.text)

logging.info("Syncing job completed in %.3f seconds"
LOG.info("Syncing job completed in %.3f seconds"
% (time.time() - started_at))


def main():
CONF = config.get_config()
config.process_args("HEALTH",
default_config_path="/etc/health/config.json",
defaults=cfg.DEFAULT,
validation_schema=cfg.SCHEMA)
# Init Elastic index in backend

for src in CONF["sources"]:
es.ensure_index_exists(CONF["backend"]["elastic"], src["region"])

# Setup periodic job that does aggregation magic
run_every_min = CONF.get("config", {}).get("run_every_minutes", 1)
run_every_min = CONF["config"]["run_every_minutes"]
schedule.every(run_every_min).minutes.do(job)

job()

while True:
schedule.run_pending()
time.sleep(1)


if __name__ == "__main__":
main()
Loading

0 comments on commit 9443a80

Please sign in to comment.