-
Notifications
You must be signed in to change notification settings - Fork 17
/
ElasticsearchHelper.java
180 lines (163 loc) · 6.61 KB
/
ElasticsearchHelper.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
package it.okkam.flink;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.index.IndexNotFoundException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This class manages the creation of index templates and index mapping on elasticsearch.
*
* <p>Example:
*
* <pre>
* {
* @code
* ElasticSearchHelper esHelper = new ElasticSearchHelper(config, transports);
*
* // Create an Index Template given a name and the json structure
* esHelper.initTemplate(templateName, templateRequest);
*
* // Create an Index Mapping given the Index Name, DocType and the json structure
* esHelper.initIndexMapping(indexName, docType, mappingsRequest);
*
* }
* </pre>
*
* <p>The {@link Map} passed to the constructor is forwarded to Elasticsearch when creating
* {@link TransportClient}. The config keys can be found in the Elasticsearch documentation. An
* important setting is {@code cluster.name}, this should be set to the name of the cluster that the
* sink should emit to.
*
*/
public class ElasticsearchHelper {
private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchHelper.class);
private Client client;
private static final int DEFAULT_INDEX_SHARDS = 2;
private static final int DEFAULT_INDEX_REPLICAS = 0;
/**
* Creates a new ElasticSearchHelper that connects to the cluster using a TransportClient.
*
* @param userConfig The map of user settings that are passed when constructing the
* TransportClients
* @param transportAddresses The Elasticsearch Nodes to which to connect using a
* {@code TransportClient}
*/
public ElasticsearchHelper(Map<String, String> userConfig,
List<InetSocketAddress> transportAddresses) {
client = buildElasticsearchClient(userConfig, transportAddresses);
}
/**
* Build a TransportClient to connect to the cluster.
*
* @param userConfig The map of user settings that are passed when constructing the
* TransportClients
* @param transportAddresses The Elasticsearch Nodes to which to connect using a
* {@code TransportClient}
* @return Initialized TransportClient
*/
public static Client buildElasticsearchClient(Map<String, String> userConfig,
List<InetSocketAddress> transportAddresses) {
List<TransportAddress> transportNodes;
transportNodes = new ArrayList<>(transportAddresses.size());
for (InetSocketAddress address : transportAddresses) {
transportNodes.add(new InetSocketTransportAddress(address));
}
Settings settings = Settings.settingsBuilder().put(userConfig).build();
TransportClient transportClient = TransportClient.builder().settings(settings).build();
for (TransportAddress transport : transportNodes) {
transportClient.addTransportAddress(transport);
}
// verify that we actually are connected to a cluster
ImmutableList<DiscoveryNode> nodes = ImmutableList.copyOf(transportClient.connectedNodes());
if (nodes.isEmpty()) {
throw new RuntimeException("Client is not connected to any Elasticsearch nodes!");
}
return transportClient;
}
/**
* Create a new index template.
*
* @param templateName Name of the template to create
* @param templateReq Json defining the index template
*/
public void initTemplate(String templateName, String templateReq) throws Exception {
// Check if the template is set
if (templateReq != null && !templateReq.equals("")) {
// Json deserialization
PutIndexTemplateRequest request =
new PutIndexTemplateRequest(templateName).source(templateReq);
// Sending the request to elastic search
sendIndexTemplateRequest(request);
}
}
/**
* Send the index template request to elasticsearch.
*
* @param indexTemplateRequest A valid index template request
*/
public void sendIndexTemplateRequest(PutIndexTemplateRequest indexTemplateRequest)
throws Exception {
// Check if the template is set
if (indexTemplateRequest != null) {
// Sending the request to elastic search
client.admin().indices().putTemplate(indexTemplateRequest).get();
}
}
/**
* Create a new mapping for a document type for an index.
*
* @param indexName Index name where add the mapping
* @param docType Document type of the mapping
* @param mappingReq Json defining the index mapping
*/
public void initIndexMapping(String indexName, String docType, String mappingReq)
throws Exception {
PutMappingRequest request = new PutMappingRequest(indexName).source(mappingReq).type(docType);
// Put the mapping to the index
sendIndexMappingRequest(request);
LOG.debug("Updating mappings...");
}
/**
* Send the index mapping request to elasticsearch.
*
* @param mappingRequest A valid index mapping request
*/
public void sendIndexMappingRequest(PutMappingRequest mappingRequest) throws Exception {
// Check if the template is set
if (mappingRequest != null) {
try {
// Check if the index exists
SearchResponse response =
client.prepareSearch(mappingRequest.indices()).setTypes(mappingRequest.type()).get();
if (response != null) {
LOG.debug("Index found, no need to create it...");
}
} catch (IndexNotFoundException infe) {
for (String indexName : mappingRequest.indices()) {
// If the index does not exist, create it
client.admin().indices().prepareCreate(indexName)
.setSettings(Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, DEFAULT_INDEX_SHARDS)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, DEFAULT_INDEX_REPLICAS))
.execute().actionGet();
LOG.info("Index " + indexName + " not found, creating it...");
}
}
// Sending the request to elastic search
client.admin().indices().putMapping(mappingRequest).get();
}
}
}