forked from giantswarm/curator
-
Notifications
You must be signed in to change notification settings - Fork 0
/
curator.py
119 lines (95 loc) · 3.53 KB
/
curator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
# coding: utf8
from datetime import date
from datetime import timedelta
from elasticsearch import Elasticsearch
import json
import os
import sys
# read environment variables
elasticsearch_host = os.getenv("ELASTICSEARCH_HOST", "elasticsearch:9200")
retention_days = int(os.getenv("RETENTION_DAYS", "14"))
index_name_prefix = os.getenv("INDEX_NAME_PREFIX", "fluentd- gslogs-")
index_name_timeformat = os.getenv("INDEX_NAME_TIMEFORMAT", "%Y.%m.%d")
def log(level="info", message="", extra=None):
"""
Prints a JSON-formatted log message
"""
msg = {
"level": level,
"message": message
}
if extra is not None:
msg["extra"] = extra
print(json.dumps(msg))
def get_valid_indices(nameprefix, retention_days, timeformat):
"""
Returns a set of valid index names to keep
"""
out = set()
for n in range(retention_days):
today = date.today() - timedelta(days=n)
index_name_format = nameprefix + timeformat
string = today.strftime(index_name_format)
out.add(string)
return out
def main():
global index_name_prefix
global retention_days
global index_name_timeformat
global elasticsearch_host
# Initial validation
if retention_days < 1:
log("error", "Retention period in days is too short (RETENTION_DAYS=%d)" % retention_days)
sys.exit(1)
if index_name_prefix == "":
log("error", "Index name prefix is empty (INDEX_NAME_PREFIX='')")
sys.exit(1)
if index_name_timeformat == "":
log("error", "Index name time format is empty (INDEX_NAME_TIMEFORMAT='')")
sys.exit(1)
if elasticsearch_host == "":
log("error", "Elasticsearch host is empty (ELASTICSEARCH_HOST='')")
sys.exit(1)
# index name prefixes from space-separated string
index_name_prefix_list = index_name_prefix.split()
for index_name_prefix in index_name_prefix_list:
log("info", "Removing indices with name format '{prefix}{timeformat}' older than {days} days from host '{host}'".format(
prefix=index_name_prefix,
timeformat=index_name_timeformat,
days=retention_days,
host=elasticsearch_host,
))
# Create a set of names with index names that should be kept for now
valid = get_valid_indices(index_name_prefix, retention_days, index_name_timeformat)
if len(valid) == 0:
log("error", "The current index name settings yield no index names to retain")
sys.exit(1)
try:
es = Elasticsearch([elasticsearch_host])
except Exception as e:
log("error", "Could not connect to elasticsearch", extra={
"exception": e
})
sys.exit(1)
searchterm = index_name_prefix + "*"
try:
indices = es.indices.get(searchterm)
except Exception as e:
log("error", "Could not list indices for '%s'" % searchterm, extra={
"exception": e
})
sys.exit(1)
if len(indices) == 0:
log("info", "No indices found")
sys.exit()
for index in es.indices.get(searchterm):
if index not in valid:
try:
es.indices.delete(index=index)
log("info", "Deleted index %s" % index)
except Exception as e:
log("error", "Error deleting index '%s'" % index, extra={
"exception": e
})
if __name__ == "__main__":
main()