diff --git a/docs/docs/sinks/prometheus-sink.md b/docs/docs/sinks/prometheus-sink.md index 0c9caffb3..cf3ba49fa 100644 --- a/docs/docs/sinks/prometheus-sink.md +++ b/docs/docs/sinks/prometheus-sink.md @@ -17,6 +17,14 @@ Defines the connection timeout for the request in millis. - Type: `required` - Default value: `10000` +### `SINK_PROM_MAX_CONNECTIONS` + +Defines the maximum number of HTTP connections with Prometheus. + +- Example value: `10` +- Type: `optional` +- Default value: `default no more than 2 concurrent connections per given route and no more 20 connections` + ### `SINK_PROM_RETRY_STATUS_CODE_RANGES` Defines the range of HTTP status codes for which retry will be attempted. diff --git a/src/main/java/io/odpf/firehose/config/PromSinkConfig.java b/src/main/java/io/odpf/firehose/config/PromSinkConfig.java index e62b07a28..8f4bb4bbf 100644 --- a/src/main/java/io/odpf/firehose/config/PromSinkConfig.java +++ b/src/main/java/io/odpf/firehose/config/PromSinkConfig.java @@ -22,6 +22,9 @@ public interface PromSinkConfig extends AppConfig { @DefaultValue("10000") Integer getSinkPromRequestTimeoutMs(); + @Key("SINK_PROM_MAX_CONNECTIONS") + Integer getSinkPromMaxConnections(); + @Key("SINK_PROM_SERVICE_URL") String getSinkPromServiceUrl(); diff --git a/src/main/java/io/odpf/firehose/sink/prometheus/PromSinkFactory.java b/src/main/java/io/odpf/firehose/sink/prometheus/PromSinkFactory.java index 22628d2e0..b9b5ccfd7 100644 --- a/src/main/java/io/odpf/firehose/sink/prometheus/PromSinkFactory.java +++ b/src/main/java/io/odpf/firehose/sink/prometheus/PromSinkFactory.java @@ -13,7 +13,7 @@ import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClientBuilder; import org.apache.http.impl.client.HttpClients; -import org.apache.http.impl.conn.BasicHttpClientConnectionManager; +import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; import java.util.Map; @@ -67,7 +67,12 @@ private static CloseableHttpClient newHttpClient(PromSinkConfig promSinkConfig) RequestConfig requestConfig = RequestConfig.custom().setSocketTimeout(promSinkConfig.getSinkPromRequestTimeoutMs()) .setConnectionRequestTimeout(promSinkConfig.getSinkPromRequestTimeoutMs()) .setConnectTimeout(promSinkConfig.getSinkPromRequestTimeoutMs()).build(); - BasicHttpClientConnectionManager connectionManager = new BasicHttpClientConnectionManager(); + PoolingHttpClientConnectionManager connectionManager = new PoolingHttpClientConnectionManager(); + if (promSinkConfig.getSinkPromMaxConnections() != null && promSinkConfig.getSinkPromMaxConnections() > 0) { + connectionManager.setMaxTotal(promSinkConfig.getSinkPromMaxConnections()); + connectionManager.setDefaultMaxPerRoute(promSinkConfig.getSinkPromMaxConnections()); + } + HttpClientBuilder builder = HttpClients.custom().setConnectionManager(connectionManager).setDefaultRequestConfig(requestConfig); return builder.build();