Skip to content

Commit

Permalink
feat:支持pushgateway上报开启gzip压缩 (#401)
Browse files Browse the repository at this point in the history
  • Loading branch information
chuntaojun committed May 30, 2023
1 parent 22cdebe commit 6ff7a2d
Show file tree
Hide file tree
Showing 10 changed files with 591 additions and 527 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ public class ServiceCallResultReport {

private static final int TOTAL_SERVICE = 20;

private static final int PER_INSTANCES = 1000;

public static void main(String[] args) throws Exception {
InitResult initResult = ExampleUtils.initConsumerConfiguration(args, true);
SDKContext context = ExampleUtils.createSDKContext(initResult.getConfig());
Expand All @@ -58,7 +60,7 @@ private static void mockService(ProviderAPI providerAPI) throws Exception {
for (int i = 0; i < TOTAL_SERVICE; i++) {
String service = servicePrefix + i;
String namespace = "MOCK_NAMESPACE_" + random.nextInt(3);
for (int j = 0; j < 2; j++) {
for (int j = 0; j < PER_INSTANCES; j++) {
InstanceRegisterRequest request = new InstanceRegisterRequest();
request.setNamespace(namespace);
request.setService(service);
Expand All @@ -85,7 +87,7 @@ private static void mockReportServiceCallResult(ConsumerAPI consumerAPI) {
"/api/v1/dog/" + namespace + "/" + service,
"/api/v1/cloud/" + namespace + "/" + service,
});
for (int j = 0; j < 2; j++) {
for (int j = 0; j < PER_INSTANCES; j++) {
if (random.nextBoolean()) {
continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,29 +26,23 @@

public class StatInfoCollectorContainer {
private final StatInfoRevisionCollector<InstanceGauge> insCollector;
private final StatInfoRevisionCollector<RateLimitGauge> rateLimitCollector;
private final StatInfoStatefulCollector<CircuitBreakGauge> circuitBreakerCollector;

public StatInfoCollectorContainer() {
this.insCollector = new StatInfoRevisionCollector<InstanceGauge>();
this.rateLimitCollector = new StatInfoRevisionCollector<RateLimitGauge>();
this.circuitBreakerCollector = new StatInfoStatefulCollector<CircuitBreakGauge>();
}

public StatInfoRevisionCollector<InstanceGauge> getInsCollector() {
return insCollector;
}

public StatInfoRevisionCollector<RateLimitGauge> getRateLimitCollector() {
return rateLimitCollector;
}

public StatInfoStatefulCollector<CircuitBreakGauge> getCircuitBreakerCollector() {
return circuitBreakerCollector;
}

public List<StatInfoCollector<?, ? extends StatMetric>> getCollectors() {
return Arrays.asList(insCollector, rateLimitCollector, circuitBreakerCollector);
return Arrays.asList(insCollector, circuitBreakerCollector);
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,7 @@ public static class SystemMetricLabelOrder {
SystemMetricName.RULE_NAME,
SystemMetricName.METRIC_NAME_LABEL,
};
public static final String[] RATELIMIT_GAUGE_LABEL_ORDER = new String[]{
SystemMetricName.CALLEE_NAMESPACE,
SystemMetricName.CALLEE_SERVICE,
SystemMetricName.CALLEE_METHOD,
SystemMetricName.CALLER_LABELS,
SystemMetricName.METRIC_NAME_LABEL,
SystemMetricName.RULE_NAME,
};

public static final String[] CIRCUIT_BREAKER_LABEL_ORDER = new String[]{
SystemMetricName.CALLEE_NAMESPACE,
SystemMetricName.CALLEE_SERVICE,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
package com.tencent.polaris.plugins.stat.prometheus.exporter;

import java.io.BufferedWriter;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStreamWriter;
import java.io.UnsupportedEncodingException;
import java.net.HttpURLConnection;
import java.net.URL;
import java.net.URLEncoder;
import java.util.Map;
import java.util.zip.GZIPOutputStream;

import javax.xml.bind.DatatypeConverter;

import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.exporter.DefaultHttpConnectionFactory;
import io.prometheus.client.exporter.HttpConnectionFactory;
import io.prometheus.client.exporter.common.TextFormat;

public class PushGateway extends io.prometheus.client.exporter.PushGateway {

private static final int MILLISECONDS_PER_SECOND = 1000;

private HttpConnectionFactory connectionFactory = new DefaultHttpConnectionFactory();


public PushGateway(String address) {
super(address);
}

public PushGateway(URL serverBaseURL) {
super(serverBaseURL);
}

@Override
public void setConnectionFactory(HttpConnectionFactory connectionFactory) {
super.setConnectionFactory(connectionFactory);
this.connectionFactory = connectionFactory;
}

/**
* Pushes all metrics in a registry, replacing only previously pushed metrics of the same name, job and grouping key.
* <p>
* This uses the POST HTTP method.
*/
public void pushAddByGzip(CollectorRegistry registry, String job, Map<String, String> groupingKey) throws IOException {
doRequestByGzip(registry, job, groupingKey, "POST");
}

void doRequestByGzip(CollectorRegistry registry, String job, Map<String, String> groupingKey, String method) throws IOException {
String url = gatewayBaseURL;
if (job.contains("/")) {
url += "job@base64/" + base64url(job);
}
else {
url += "job/" + URLEncoder.encode(job, "UTF-8");
}

if (groupingKey != null) {
for (Map.Entry<String, String> entry : groupingKey.entrySet()) {
if (entry.getValue().isEmpty()) {
url += "/" + entry.getKey() + "@base64/=";
}
else if (entry.getValue().contains("/")) {
url += "/" + entry.getKey() + "@base64/" + base64url(entry.getValue());
}
else {
url += "/" + entry.getKey() + "/" + URLEncoder.encode(entry.getValue(), "UTF-8");
}
}
}
HttpURLConnection connection = connectionFactory.create(url);
connection.setRequestProperty("Content-Type", TextFormat.CONTENT_TYPE_004);
connection.setRequestProperty("Content-Encoding", "gzip");
if (!method.equals("DELETE")) {
connection.setDoOutput(true);
}
connection.setRequestMethod(method);

connection.setConnectTimeout(10 * MILLISECONDS_PER_SECOND);
connection.setReadTimeout(10 * MILLISECONDS_PER_SECOND);
connection.connect();

try {
if (!method.equals("DELETE")) {
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(outputStream, "UTF-8"));
TextFormat.write004(writer, registry.metricFamilySamples());
writer.flush();
writer.close();
GZIPOutputStream zipStream = new GZIPOutputStream(connection.getOutputStream());
zipStream.write(outputStream.toByteArray());
zipStream.finish();
zipStream.flush();
zipStream.close();
}

int response = connection.getResponseCode();
if (response / 100 != 2) {
String errorMessage;
InputStream errorStream = connection.getErrorStream();
if (errorStream != null) {
String errBody = readFromStream(errorStream);
errorMessage = "Response code from " + url + " was " + response + ", response body: " + errBody;
}
else {
errorMessage = "Response code from " + url + " was " + response;
}
throw new IOException(errorMessage);
}
}
finally {
connection.disconnect();
}
}

private static String base64url(String v) {
// Per RFC4648 table 2. We support Java 6, and java.util.Base64 was only added in Java 8,
try {
return DatatypeConverter.printBase64Binary(v.getBytes("UTF-8")).replace("+", "-").replace("/", "_");
}
catch (UnsupportedEncodingException e) {
throw new RuntimeException(e); // Unreachable.
}
}

private static String readFromStream(InputStream is) throws IOException {
ByteArrayOutputStream result = new ByteArrayOutputStream();
byte[] buffer = new byte[1024];
int length;
while ((length = is.read(buffer)) != -1) {
result.write(buffer, 0, length);
}
return result.toString("UTF-8");
}

private static class LocalByteArray extends ThreadLocal<ByteArrayOutputStream> {

private LocalByteArray() {
}

protected ByteArrayOutputStream initialValue() {
return new ByteArrayOutputStream(1048576);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -137,32 +137,6 @@ public static Map<String, String> convertInsGaugeToLabels(InstanceGauge insGauge
return labels;
}

public static Map<String, String> convertRateLimitGaugeToLabels(RateLimitGauge rateLimitGauge) {
Map<String, String> labels = new HashMap<>();
for (String labelName : SystemMetricModel.SystemMetricLabelOrder.RATELIMIT_GAUGE_LABEL_ORDER) {
switch (labelName) {
case SystemMetricName.CALLEE_NAMESPACE:
addLabel(labelName, rateLimitGauge.getNamespace(), labels);
break;
case SystemMetricName.CALLEE_SERVICE:
addLabel(labelName, rateLimitGauge.getService(), labels);
break;
case SystemMetricName.CALLEE_METHOD:
addLabel(labelName, rateLimitGauge.getMethod(), labels);
break;
case SystemMetricName.CALLER_LABELS:
addLabel(labelName, rateLimitGauge.getLabels(), labels);
break;
case SystemMetricName.RULE_NAME:
addLabel(labelName, rateLimitGauge.getRuleName(), labels);
break;
default:
}
}

return labels;
}

public static Map<String, String> convertCircuitBreakToLabels(CircuitBreakGauge gauge, String callerIp) {
Map<String, String> labels = new HashMap<>();
for (String labelName : SystemMetricModel.SystemMetricLabelOrder.CIRCUIT_BREAKER_LABEL_ORDER) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.net.URLDecoder;
import java.nio.charset.StandardCharsets;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.zip.GZIPOutputStream;
import org.slf4j.Logger;
Expand Down Expand Up @@ -71,12 +72,10 @@ public void handle(HttpExchange exchange) throws IOException {
if (shouldUseCompression(exchange)) {
exchange.getResponseHeaders().set("Content-Encoding", "gzip");
exchange.sendResponseHeaders(200, 0L);
GZIPOutputStream os = new GZIPOutputStream(exchange.getResponseBody());

try {
response.writeTo(os);
} finally {
os.close();
try (GZIPOutputStream os = new GZIPOutputStream(exchange.getResponseBody())) {
os.write(response.toByteArray());
os.finish();
os.flush();
}
} else {
exchange.getResponseHeaders().set("Content-Length", String.valueOf(response.size()));
Expand All @@ -88,17 +87,17 @@ public void handle(HttpExchange exchange) throws IOException {
}

private boolean shouldUseCompression(HttpExchange exchange) {
// List<String> encodingHeaders = exchange.getRequestHeaders().get("Accept-Encoding");
// if (encodingHeaders != null) {
// for (String encodingHeader : encodingHeaders) {
// String[] encodings = encodingHeader.split(",");
// for (String encoding : encodings) {
// if ("gzip".equalsIgnoreCase(encoding.trim())) {
// return true;
// }
// }
// }
// }
List<String> encodingHeaders = exchange.getRequestHeaders().get("Accept-Encoding");
if (encodingHeaders != null) {
for (String encodingHeader : encodingHeaders) {
String[] encodings = encodingHeader.split(",");
for (String encoding : encodings) {
if ("gzip".equalsIgnoreCase(encoding.trim())) {
return true;
}
}
}
}
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,10 @@ public class PrometheusHandlerConfig implements Verifier {

@JsonProperty
@JsonDeserialize(using = TimeStrJsonDeserializer.class)
private Long pushInterval;
private Long pushInterval = 10000L;

@JsonProperty
private Boolean openGzip = false;

public PrometheusHandlerConfig() {
}
Expand Down Expand Up @@ -91,7 +94,18 @@ public void setDefault(Object defaultObject) {
setAddress(config.getAddress());
}
if (null == pushInterval) {
setPushInterval(config.getPushInterval());
if (config.getPushInterval() != null) {
setPushInterval(config.getPushInterval());
} else {
setPushInterval(10000L);
}
}
if (null == openGzip) {
if (config.isOpenGzip() != null) {
setOpenGzip(config.isOpenGzip());
} else {
setOpenGzip(false);
}
}
}
}
Expand Down Expand Up @@ -143,4 +157,12 @@ public Long getPushInterval() {
public void setPushInterval(Long pushInterval) {
this.pushInterval = pushInterval;
}

public Boolean isOpenGzip() {
return openGzip;
}

public void setOpenGzip(Boolean openGzip) {
this.openGzip = openGzip;
}
}

0 comments on commit 6ff7a2d

Please sign in to comment.