This repository has been archived by the owner on Aug 2, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 80
/
DestinationHttpClient.java
171 lines (147 loc) Β· 6.97 KB
/
DestinationHttpClient.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package com.amazon.opendistroforelasticsearch.alerting.destination.client;
import com.amazon.opendistroforelasticsearch.alerting.destination.message.BaseMessage;
import com.amazon.opendistroforelasticsearch.alerting.destination.message.CustomWebhookMessage;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.utils.URIBuilder;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.DefaultHttpRequestRetryHandler;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.util.EntityUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.rest.RestStatus;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.util.Map;
/**
* This class handles the connections to the given Destination.
*/
public class DestinationHttpClient {
private static final Logger logger = LogManager.getLogger(DestinationHttpClient.class);
private static final int MAX_CONNECTIONS = 60;
private static final int MAX_CONNECTIONS_PER_ROUTE = 20;
private static final int TIMEOUT_MILLISECONDS = (int) TimeValue.timeValueSeconds(5).millis();
private static final int SOCKET_TIMEOUT_MILLISECONDS = (int)TimeValue.timeValueSeconds(50).millis();
private static CloseableHttpClient HTTP_CLIENT = createHttpClient();
private static CloseableHttpClient createHttpClient() {
RequestConfig config = RequestConfig.custom()
.setConnectTimeout(TIMEOUT_MILLISECONDS)
.setConnectionRequestTimeout(TIMEOUT_MILLISECONDS)
.setSocketTimeout(SOCKET_TIMEOUT_MILLISECONDS)
.build();
PoolingHttpClientConnectionManager connectionManager = new PoolingHttpClientConnectionManager();
connectionManager.setMaxTotal(MAX_CONNECTIONS);
connectionManager.setDefaultMaxPerRoute(MAX_CONNECTIONS_PER_ROUTE);
return HttpClientBuilder.create()
.setDefaultRequestConfig(config)
.setConnectionManager(connectionManager)
.setRetryHandler(new DefaultHttpRequestRetryHandler())
.useSystemProperties()
.build();
}
public String execute(BaseMessage message) throws Exception {
CloseableHttpResponse response = null;
try {
response = getHttpResponse(message);
validateResponseStatus(response);
return getResponseString(response);
} finally {
if (response != null) {
EntityUtils.consumeQuietly(response.getEntity());
}
}
}
private CloseableHttpResponse getHttpResponse(BaseMessage message) throws Exception {
URI uri = null;
HttpPost httpPostRequest = new HttpPost();
if (message instanceof CustomWebhookMessage) {
CustomWebhookMessage customWebhookMessage = (CustomWebhookMessage) message;
uri = buildUri(customWebhookMessage.getUrl(), customWebhookMessage.getScheme(), customWebhookMessage.getHost(),
customWebhookMessage.getPort(), customWebhookMessage.getPath(), customWebhookMessage.getQueryParams());
// set headers
Map<String, String> headerParams = customWebhookMessage.getHeaderParams();
if(headerParams == null || headerParams.isEmpty()) {
// set default header
httpPostRequest.setHeader("Content-Type", "application/json");
} else {
for (Map.Entry<String, String> e : customWebhookMessage.getHeaderParams().entrySet())
httpPostRequest.setHeader(e.getKey(), e.getValue());
}
} else {
uri = buildUri(message.getUrl().trim(), null, null, -1, null, null);
}
httpPostRequest.setURI(uri);
StringEntity entity = new StringEntity(extractBody(message), StandardCharsets.UTF_8);
httpPostRequest.setEntity(entity);
return HTTP_CLIENT.execute(httpPostRequest);
}
private URI buildUri(String endpoint, String scheme, String host,
int port, String path, Map<String, String> queryParams)
throws Exception {
try {
if(Strings.isNullOrEmpty(endpoint)) {
logger.info("endpoint empty. Fall back to host:port/path");
if (Strings.isNullOrEmpty(scheme)) {
scheme = "https";
}
URIBuilder uriBuilder = new URIBuilder();
if(queryParams != null) {
for (Map.Entry<String, String> e : queryParams.entrySet())
uriBuilder.addParameter(e.getKey(), e.getValue());
}
return uriBuilder.setScheme(scheme).setHost(host).setPort(port).setPath(path).build();
}
return new URIBuilder(endpoint).build();
} catch (URISyntaxException exception) {
logger.error("Error occured while building Uri");
throw new IllegalStateException("Error creating URI");
}
}
public String getResponseString(CloseableHttpResponse response) throws IOException {
HttpEntity entity = response.getEntity();
if (entity == null)
return "{}";
String responseString = EntityUtils.toString(entity);
logger.debug("Http response: " + responseString);
return responseString;
}
private void validateResponseStatus(HttpResponse response) throws IOException {
int statusCode = response.getStatusLine().getStatusCode();
if (statusCode != RestStatus.OK.getStatus()) {
throw new IOException("Failed: " + response);
}
}
private String extractBody(BaseMessage message) {
return message.getMessageContent();
}
/*
* This method is useful for Mocking the client
*/
public void setHttpClient(CloseableHttpClient httpClient) {
HTTP_CLIENT = httpClient;
}
}