Skip to content

Commit

Permalink
Rename mapping/es to storage and start using elasticsearch lib
Browse files Browse the repository at this point in the history
This commit renames mapping.es.py to storage.py to unify things with
other projects. This also adds initial usage of elasticsearch library
client and expands config to allow old and new code to coexist.

Closes #42
Related #37
  • Loading branch information
teferi committed Dec 16, 2016
1 parent a9a6a80 commit a13cbc6
Show file tree
Hide file tree
Showing 10 changed files with 64 additions and 96 deletions.
3 changes: 2 additions & 1 deletion etc/config.json
Expand Up @@ -15,7 +15,8 @@
],
"backend": {
"elastic": "http://4.3.2.1:9200/",
"elastic_index": "ms_health_idx_1"
"type": "elastic",
"connection": [{"host": "127.0.0.1", "port": 9200}]
},
"config": {
"run_every_minutes": 2
Expand Down
21 changes: 17 additions & 4 deletions health/config.py
Expand Up @@ -39,6 +39,8 @@
],
"backend": {
"elastic": "http://127.0.0.1:9200/",
"type": "elastic",
"connection": [{"host": "127.0.0.1", "port": 9200}]
},
"config": {
"run_every_minutes": 2
Expand Down Expand Up @@ -75,11 +77,22 @@
"backend": {
"type": "object",
"properties": {
"elastic": {
"type": "string"
}
"elastic": {"type": "string"},
"type": {"type": "string"},
"connection": {
"type": "array",
"items": {
"type": "object",
"properties": {
"host": {"type": "string"},
"port": {"type": "integer"}
},
"required": ["host"]
},
"minItems": 1
},
},
"required": ["elastic"]
"required": ["elastic", "type", "connection"]
},
"config": {
"type": "object",
Expand Down
4 changes: 2 additions & 2 deletions health/job.py
Expand Up @@ -24,7 +24,7 @@

from health import config
from health.drivers import utils
from health.mapping import es
from health import storage


LOGGING_FORMAT = '[%(asctime)s] %(levelname)s in %(module)s: %(message)s'
Expand Down Expand Up @@ -103,7 +103,7 @@ def main():
# Init Elastic index in backend

for src in CONF["sources"]:
es.ensure_index_exists(CONF["backend"]["elastic"], src["region"])
storage.ensure_index_exists(src["region"])

# Setup periodic job that does aggregation magic
run_every_min = CONF.get("config", {}).get("run_every_minutes", 1)
Expand Down
5 changes: 0 additions & 5 deletions health/mapping/README.rst

This file was deleted.

Empty file removed health/mapping/__init__.py
Empty file.
60 changes: 40 additions & 20 deletions health/mapping/es.py → health/storage.py
Expand Up @@ -13,11 +13,13 @@
# License for the specific language governing permissions and limitations
# under the License.

import json
import logging
import sys

import requests
import elasticsearch

from health import config


_http_codes = {
"type": "object",
Expand Down Expand Up @@ -52,7 +54,7 @@
}
}

mapping = {
ES_MAPPINGS = {
"settings": {
"number_of_shards": 5
},
Expand All @@ -73,28 +75,46 @@
}
}

existing_indices = set()
ES_CLIENT = None


def get_elasticsearch():
"""Configures or returns already configured ES client."""
global ES_CLIENT
if not ES_CLIENT:
nodes = config.get_config()["backend"]["connection"]
ES_CLIENT = elasticsearch.Elasticsearch(nodes)
return ES_CLIENT


def ensure_index_exists(es, region):
EXISTING_INDICES = set()


def ensure_index_exists(region):
index_to_create = "ms_health_%s" % region

if index_to_create in existing_indices:
if index_to_create in EXISTING_INDICES:
return

r = requests.get("%s/%s" % (es, index_to_create))

if not r.ok:
r = requests.put("%s/%s" % (es, index_to_create),
data=json.dumps(mapping))
if r.ok:
logging.info("Index '{}' created successfully".format(
index_to_create))
existing_indices.add(index_to_create)
else:
logging.error("Got {} status when creating index '{}'. {}".format(
r.status_code, index_to_create, r.text))
sys.exit(1)
es = get_elasticsearch()

if not es.indices.exists(index_to_create):
try:
es.indices.create(index_to_create, ES_MAPPINGS)
logging.info("Created '{}' index".format(index_to_create))
except elasticsearch.ElasticsearchException as e:
if e.error == "index_already_exists_exception":
# we might catch already exists here if 2 jobs are strted
# concurrently. It's ok.
logging.info(
"Index {} already exists".format(index_to_create))
EXISTING_INDICES.add(index_to_create)
else:
logging.error(
"Got {} error when creating index '{}'.".format(
e, index_to_create))
sys.exit(1)
EXISTING_INDICES.add(index_to_create)
else:
existing_indices.add(index_to_create)
logging.info("Index {} already exists".format(index_to_create))
EXISTING_INDICES.add(index_to_create)
1 change: 1 addition & 0 deletions requirements.txt
Expand Up @@ -3,5 +3,6 @@ Flask==0.11.1

requests
schedule
elasticsearch==5.0.1
jsonschema>=2.0.0,!=2.5.0,<3.0.0 # MIT
flask-helpers
3 changes: 2 additions & 1 deletion tests/unit/api/v1/etc/config.json
Expand Up @@ -10,7 +10,8 @@
],
"backend": {
"elastic": "http://4.3.2.1:9200/",
"elastic_index": "ms_health_idx_1"
"type": "elastic",
"connection": [{"host": "4.3.2.1", "port": 9200}]
},
"config": {
"run_every_minutes": 2
Expand Down
Empty file removed tests/unit/mapping/__init__.py
Empty file.
63 changes: 0 additions & 63 deletions tests/unit/mapping/test_es.py

This file was deleted.

0 comments on commit a13cbc6

Please sign in to comment.