-
Notifications
You must be signed in to change notification settings - Fork 52
/
PromSinkFactory.java
80 lines (67 loc) · 3.56 KB
/
PromSinkFactory.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
package io.odpf.firehose.sink.prometheus;
import io.odpf.depot.metrics.StatsDReporter;
import io.odpf.firehose.config.PromSinkConfig;
import io.odpf.firehose.metrics.FirehoseInstrumentation;
import io.odpf.firehose.sink.AbstractSink;
import io.odpf.firehose.sink.prometheus.request.PromRequest;
import io.odpf.firehose.sink.prometheus.request.PromRequestCreator;
import io.odpf.stencil.client.StencilClient;
import io.odpf.stencil.Parser;
import org.aeonbits.owner.ConfigFactory;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import java.util.Map;
/**
* Factory class to create the Prometheus Sink.
* The consumer framework would reflectively instantiate this factory
* using the configurations supplied and invoke
* {@see #create(Map < String, String > configuration, StatsDReporter statsDReporter, StencilClient stencilClient)}
* to obtain the Prometheus sink implementation.
*/
public class PromSinkFactory {
/**
* Create Prometheus sink.
*
* @param configuration the configuration
* @param statsDReporter the statsd reporter
* @param stencilClient the stencil client
* @return PromSink
*/
public static AbstractSink create(Map<String, String> configuration, StatsDReporter statsDReporter, StencilClient stencilClient) {
PromSinkConfig promSinkConfig = ConfigFactory.create(PromSinkConfig.class, configuration);
String promSchemaProtoClass = promSinkConfig.getInputSchemaProtoClass();
FirehoseInstrumentation firehoseInstrumentation = new FirehoseInstrumentation(statsDReporter, PromSinkFactory.class);
CloseableHttpClient closeableHttpClient = newHttpClient(promSinkConfig);
firehoseInstrumentation.logInfo("HTTP connection established");
Parser protoParser = stencilClient.getParser(promSchemaProtoClass);
PromRequest request = new PromRequestCreator(statsDReporter, promSinkConfig, protoParser).createRequest();
return new PromSink(new FirehoseInstrumentation(statsDReporter, PromSink.class),
request,
closeableHttpClient,
stencilClient,
promSinkConfig.getSinkPromRetryStatusCodeRanges(),
promSinkConfig.getSinkPromRequestLogStatusCodeRanges()
);
}
/**
* create a new http client.
*
* @param promSinkConfig the prometheus sink configuration
* @return CloseableHttpClient
*/
private static CloseableHttpClient newHttpClient(PromSinkConfig promSinkConfig) {
RequestConfig requestConfig = RequestConfig.custom().setSocketTimeout(promSinkConfig.getSinkPromRequestTimeoutMs())
.setConnectionRequestTimeout(promSinkConfig.getSinkPromRequestTimeoutMs())
.setConnectTimeout(promSinkConfig.getSinkPromRequestTimeoutMs()).build();
PoolingHttpClientConnectionManager connectionManager = new PoolingHttpClientConnectionManager();
if (promSinkConfig.getSinkPromMaxConnections() != null && promSinkConfig.getSinkPromMaxConnections() > 0) {
connectionManager.setMaxTotal(promSinkConfig.getSinkPromMaxConnections());
connectionManager.setDefaultMaxPerRoute(promSinkConfig.getSinkPromMaxConnections());
}
HttpClientBuilder builder = HttpClients.custom().setConnectionManager(connectionManager).setDefaultRequestConfig(requestConfig);
return builder.build();
}
}