From a13cbc6d3c75d2a692f24acbe8d7aaafdfe47aa4 Mon Sep 17 00:00:00 2001 From: Kirill Zaitsev Date: Fri, 16 Dec 2016 18:08:29 +0300 Subject: [PATCH] Rename mapping/es to storage and start using elasticsearch lib This commit renames mapping.es.py to storage.py to unify things with other projects. This also adds initial usage of elasticsearch library client and expands config to allow old and new code to coexist. Closes #42 Related #37 --- etc/config.json | 3 +- health/config.py | 21 ++++++++-- health/job.py | 4 +- health/mapping/README.rst | 5 --- health/mapping/__init__.py | 0 health/{mapping/es.py => storage.py} | 60 +++++++++++++++++--------- requirements.txt | 1 + tests/unit/api/v1/etc/config.json | 3 +- tests/unit/mapping/__init__.py | 0 tests/unit/mapping/test_es.py | 63 ---------------------------- 10 files changed, 64 insertions(+), 96 deletions(-) delete mode 100644 health/mapping/README.rst delete mode 100644 health/mapping/__init__.py rename health/{mapping/es.py => storage.py} (62%) delete mode 100644 tests/unit/mapping/__init__.py delete mode 100644 tests/unit/mapping/test_es.py diff --git a/etc/config.json b/etc/config.json index ace009d..69b5bed 100644 --- a/etc/config.json +++ b/etc/config.json @@ -15,7 +15,8 @@ ], "backend": { "elastic": "http://4.3.2.1:9200/", - "elastic_index": "ms_health_idx_1" + "type": "elastic", + "connection": [{"host": "127.0.0.1", "port": 9200}] }, "config": { "run_every_minutes": 2 diff --git a/health/config.py b/health/config.py index df16b9f..d944e58 100644 --- a/health/config.py +++ b/health/config.py @@ -39,6 +39,8 @@ ], "backend": { "elastic": "http://127.0.0.1:9200/", + "type": "elastic", + "connection": [{"host": "127.0.0.1", "port": 9200}] }, "config": { "run_every_minutes": 2 @@ -75,11 +77,22 @@ "backend": { "type": "object", "properties": { - "elastic": { - "type": "string" - } + "elastic": {"type": "string"}, + "type": {"type": "string"}, + "connection": { + "type": "array", + "items": { + "type": "object", + "properties": { + "host": {"type": "string"}, + "port": {"type": "integer"} + }, + "required": ["host"] + }, + "minItems": 1 + }, }, - "required": ["elastic"] + "required": ["elastic", "type", "connection"] }, "config": { "type": "object", diff --git a/health/job.py b/health/job.py index 40fd1b5..25b1d17 100644 --- a/health/job.py +++ b/health/job.py @@ -24,7 +24,7 @@ from health import config from health.drivers import utils -from health.mapping import es +from health import storage LOGGING_FORMAT = '[%(asctime)s] %(levelname)s in %(module)s: %(message)s' @@ -103,7 +103,7 @@ def main(): # Init Elastic index in backend for src in CONF["sources"]: - es.ensure_index_exists(CONF["backend"]["elastic"], src["region"]) + storage.ensure_index_exists(src["region"]) # Setup periodic job that does aggregation magic run_every_min = CONF.get("config", {}).get("run_every_minutes", 1) diff --git a/health/mapping/README.rst b/health/mapping/README.rst deleted file mode 100644 index d57a633..0000000 --- a/health/mapping/README.rst +++ /dev/null @@ -1,5 +0,0 @@ -ES Index Mappping -================= - -We should be strict as possible with what data is put to ElasticSearch -so we are using explicit Indexes. diff --git a/health/mapping/__init__.py b/health/mapping/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/health/mapping/es.py b/health/storage.py similarity index 62% rename from health/mapping/es.py rename to health/storage.py index dfcdd22..be5c281 100644 --- a/health/mapping/es.py +++ b/health/storage.py @@ -13,11 +13,13 @@ # License for the specific language governing permissions and limitations # under the License. -import json import logging import sys -import requests +import elasticsearch + +from health import config + _http_codes = { "type": "object", @@ -52,7 +54,7 @@ } } -mapping = { +ES_MAPPINGS = { "settings": { "number_of_shards": 5 }, @@ -73,28 +75,46 @@ } } -existing_indices = set() +ES_CLIENT = None + + +def get_elasticsearch(): + """Configures or returns already configured ES client.""" + global ES_CLIENT + if not ES_CLIENT: + nodes = config.get_config()["backend"]["connection"] + ES_CLIENT = elasticsearch.Elasticsearch(nodes) + return ES_CLIENT -def ensure_index_exists(es, region): +EXISTING_INDICES = set() + + +def ensure_index_exists(region): index_to_create = "ms_health_%s" % region - if index_to_create in existing_indices: + if index_to_create in EXISTING_INDICES: return - r = requests.get("%s/%s" % (es, index_to_create)) - - if not r.ok: - r = requests.put("%s/%s" % (es, index_to_create), - data=json.dumps(mapping)) - if r.ok: - logging.info("Index '{}' created successfully".format( - index_to_create)) - existing_indices.add(index_to_create) - else: - logging.error("Got {} status when creating index '{}'. {}".format( - r.status_code, index_to_create, r.text)) - sys.exit(1) + es = get_elasticsearch() + + if not es.indices.exists(index_to_create): + try: + es.indices.create(index_to_create, ES_MAPPINGS) + logging.info("Created '{}' index".format(index_to_create)) + except elasticsearch.ElasticsearchException as e: + if e.error == "index_already_exists_exception": + # we might catch already exists here if 2 jobs are strted + # concurrently. It's ok. + logging.info( + "Index {} already exists".format(index_to_create)) + EXISTING_INDICES.add(index_to_create) + else: + logging.error( + "Got {} error when creating index '{}'.".format( + e, index_to_create)) + sys.exit(1) + EXISTING_INDICES.add(index_to_create) else: - existing_indices.add(index_to_create) logging.info("Index {} already exists".format(index_to_create)) + EXISTING_INDICES.add(index_to_create) diff --git a/requirements.txt b/requirements.txt index 65b0ed8..af208c8 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,5 +3,6 @@ Flask==0.11.1 requests schedule +elasticsearch==5.0.1 jsonschema>=2.0.0,!=2.5.0,<3.0.0 # MIT flask-helpers diff --git a/tests/unit/api/v1/etc/config.json b/tests/unit/api/v1/etc/config.json index 89ecf8b..5687fd4 100644 --- a/tests/unit/api/v1/etc/config.json +++ b/tests/unit/api/v1/etc/config.json @@ -10,7 +10,8 @@ ], "backend": { "elastic": "http://4.3.2.1:9200/", - "elastic_index": "ms_health_idx_1" + "type": "elastic", + "connection": [{"host": "4.3.2.1", "port": 9200}] }, "config": { "run_every_minutes": 2 diff --git a/tests/unit/mapping/__init__.py b/tests/unit/mapping/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/tests/unit/mapping/test_es.py b/tests/unit/mapping/test_es.py deleted file mode 100644 index a75148d..0000000 --- a/tests/unit/mapping/test_es.py +++ /dev/null @@ -1,63 +0,0 @@ -# Copyright 2016: Mirantis Inc. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import mock - -from health.mapping import es -from tests.unit import test # noqa - - -class InitElasticTestCase(test.TestCase): - - def setUp(self): - super(InitElasticTestCase, self).setUp() - es.existing_indices = set() - - @mock.patch("requests.api.request") - def test_init_elastic_index_exists(self, mock_request): - mock_request.return_value.status_code = 200 - mock_request.return_value.ok = True - es.ensure_index_exists("fake-es", "regionOne") - self.assertEqual(1, mock_request.call_count) - calls = [mock.call("get", "fake-es/ms_health_regionOne", - allow_redirects=True, params=None)] - - mock_request.assert_has_calls(calls) - - @mock.patch("requests.api.request") - def test_init_elastic_create_index(self, mock_request): - mock_request.side_effect = [ - mock.Mock(status_code=404, ok=False), - mock.Mock(status_code=200, ok=True) - ] - es.ensure_index_exists("fake-es", "regionOne") - calls = [mock.call("get", "fake-es/ms_health_regionOne", - allow_redirects=True, params=None), - mock.call("put", "fake-es/ms_health_regionOne", - data=mock.ANY)] - self.assertEqual(2, mock_request.call_count) - mock_request.assert_has_calls(calls) - - @mock.patch("health.mapping.es.sys") - @mock.patch("requests.api.request") - def test_init_elastic_exit_if_failed(self, mock_request, mock_sys): - mock_request.side_effect = [ - mock.Mock(status_code=404, ok=False), - mock.Mock(status_code=400, ok=False) - ] - - es.ensure_index_exists("fake-es", "regionOne") - self.assertEqual(2, mock_request.call_count) - mock_sys.exit.assert_called_once_with(1)