-
-
Notifications
You must be signed in to change notification settings - Fork 170
/
es_mapping
executable file
·122 lines (105 loc) · 3.37 KB
/
es_mapping
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
#!/usr/bin/env python
"""Create a custom NGram analyzer for the default mapping."""
import logging
import click
from elasticsearch import Elasticsearch, helpers
from pgsync.settings import ELASTICSEARCH_TIMEOUT, ELASTICSEARCH_VERIFY_CERTS
from pgsync.urls import get_elasticsearch_url
from pgsync.utils import config_loader, get_config, timeit
logger = logging.getLogger(__name__)
NGRAM_ANALYZER = {
"analysis": {
"analyzer": {
"ngram_analyzer": {
"filter": [
"lowercase",
],
"type": "custom",
"tokenizer": "ngram_tokenizer",
},
},
"tokenizer": {
"ngram_tokenizer": {
"token_chars": [
"letter",
"digit",
"punctuation",
"symbol",
],
"min_gram": "3",
"type": "ngram",
"max_gram": "10",
},
},
},
"max_ngram_diff": 10,
}
def apply_analyzer_to_mapping(mapping: dict, analyzer: dict) -> dict:
_mapping: dict = {}
for key, value in mapping.items():
if isinstance(value, dict):
if "fields" in value:
value["fields"]["ngram"] = analyzer
value = apply_analyzer_to_mapping(value, analyzer)
_mapping[key] = value
return _mapping
def get_configuration(es, index: str) -> dict:
configuration: dict = es.indices.get_settings(index)[index]
# skip these attributes
for key in [
"uuid",
"version",
"provided_name",
"creation_date",
]:
configuration["settings"]["index"].pop(key)
mapping: dict = es.indices.get_mapping(index)
analyzer_mapping: dict = apply_analyzer_to_mapping(
mapping,
{
"analyzer": "ngram_analyzer",
"search_analyzer": "ngram_analyzer",
"fielddata": True,
"type": "text",
},
)
configuration.update(**analyzer_mapping[index])
configuration["settings"]["index"].update(**NGRAM_ANALYZER)
return configuration
@timeit
def create_es_mapping(index: str) -> None:
logger.debug(f"Create Elasticsearch mapping for index {index}")
url: str = get_elasticsearch_url()
es: Elasticsearch = Elasticsearch(
hosts=[url],
timeout=ELASTICSEARCH_TIMEOUT,
verify_certs=ELASTICSEARCH_VERIFY_CERTS,
)
tmp_index: str = "tmp_index"
es.indices.delete(index=tmp_index, ignore=[400, 404])
es.indices.refresh()
configuration: dict = get_configuration(es, index)
es.indices.create(index=tmp_index, body=configuration)
helpers.reindex(es, index, tmp_index)
es.indices.refresh()
es.indices.delete(index=index)
es.indices.refresh()
configuration: dict = get_configuration(es, tmp_index)
es.indices.create(index=index, body=configuration)
helpers.reindex(es, tmp_index, index)
es.indices.delete(index=tmp_index)
es.indices.refresh()
@click.command()
@click.option(
"--config",
"-c",
help="Schema config",
type=click.Path(exists=True),
)
def main(config):
"""Create custom NGram analyzer for the default mapping."""
config: str = get_config(config)
for index in set([doc["index"] for doc in config_loader(config)]):
create_es_mapping(index)
if __name__ == "__main__":
main()