-
Notifications
You must be signed in to change notification settings - Fork 25
/
StreamsGetMetricsHandler.java
122 lines (106 loc) 路 5.86 KB
/
StreamsGetMetricsHandler.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
/*
* Copyright 2019-2020 StreamThoughts.
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamthoughts.azkarra.http.handler;
import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.exporter.common.TextFormat;
import io.streamthoughts.azkarra.api.AzkarraStreamsService;
import io.streamthoughts.azkarra.api.errors.AzkarraException;
import io.streamthoughts.azkarra.api.model.Metric;
import io.streamthoughts.azkarra.api.model.MetricGroup;
import io.streamthoughts.azkarra.api.model.predicate.GroupMetricFilter;
import io.streamthoughts.azkarra.api.model.predicate.NameMetricFilter;
import io.streamthoughts.azkarra.api.model.predicate.NonNullMetricFilter;
import io.streamthoughts.azkarra.api.monad.Tuple;
import io.streamthoughts.azkarra.api.streams.KafkaStreamsContainer;
import io.streamthoughts.azkarra.http.error.MetricNotFoundException;
import io.streamthoughts.azkarra.http.prometheus.KafkaStreamsMetricsCollector;
import io.undertow.server.HttpServerExchange;
import io.undertow.util.Headers;
import java.io.IOException;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.util.Collections;
import java.util.Optional;
import java.util.Set;
import java.util.function.Predicate;
import static io.streamthoughts.azkarra.http.ExchangeHelper.getOptionalQueryParam;
import static io.streamthoughts.azkarra.http.ExchangeHelper.getQueryParam;
import static io.streamthoughts.azkarra.http.ExchangeHelper.sendJsonResponse;
import static io.streamthoughts.azkarra.http.ExchangeHelper.sendTextValue;
import static io.streamthoughts.azkarra.http.utils.Constants.HTTP_QUERY_PARAM_FILTER_EMPTY;
import static io.streamthoughts.azkarra.http.utils.Constants.HTTP_QUERY_PARAM_FORMAT;
import static io.streamthoughts.azkarra.http.utils.Constants.HTTP_QUERY_PARAM_FORMAT_VALUE_PROMETHEUS;
import static io.streamthoughts.azkarra.http.utils.Constants.HTTP_QUERY_PARAM_GROUP;
import static io.streamthoughts.azkarra.http.utils.Constants.HTTP_QUERY_PARAM_ID;
import static io.streamthoughts.azkarra.http.utils.Constants.HTTP_QUERY_PARAM_METRIC;
public class StreamsGetMetricsHandler extends AbstractStreamHttpHandler {
/**
* Creates a new {@link StreamsGetMetricsHandler} instance.
*
* @param service the {@link AzkarraStreamsService} instance.
*/
public StreamsGetMetricsHandler(final AzkarraStreamsService service) {
super(service);
}
/**
* {@inheritDoc}
*/
@Override
public void handleRequest(final HttpServerExchange exchange) {
final Optional<String> empty = getOptionalQueryParam(exchange, HTTP_QUERY_PARAM_FILTER_EMPTY);
final Optional<String> group = getOptionalQueryParam(exchange, HTTP_QUERY_PARAM_GROUP);
final Optional<String> name = getOptionalQueryParam(exchange, HTTP_QUERY_PARAM_METRIC);
final Optional<String> format = getOptionalQueryParam(exchange, HTTP_QUERY_PARAM_FORMAT);
final String containerId = getQueryParam(exchange, HTTP_QUERY_PARAM_ID);
Optional<Predicate<Tuple<String, Metric>>> all = Optional.of(t -> true);
Predicate<Tuple<String, Metric>> filter = all
.map(predicate -> empty.map(f -> new NonNullMetricFilter()).map(predicate::and).orElse(predicate))
.map(predicate -> group.map(GroupMetricFilter::new).map(predicate::and).orElse(predicate))
.map(predicate -> name.map(NameMetricFilter::new).map(predicate::and).orElse(predicate))
.get();
if (format.isPresent() && format.get().equals(HTTP_QUERY_PARAM_FORMAT_VALUE_PROMETHEUS)) {
CollectorRegistry registry = new CollectorRegistry();
new KafkaStreamsMetricsCollector(service, filter, containerId).register(registry);
exchange.getResponseHeaders().put(Headers.CONTENT_TYPE, TextFormat.CONTENT_TYPE_004);
OutputStream outputStream = exchange.getOutputStream();
try (OutputStreamWriter writer = new OutputStreamWriter(outputStream)) {
TextFormat.write004(writer, registry.filteredMetricFamilySamples(Collections.emptySet()));
} catch (final IOException e) {
throw new AzkarraException("Unexpected error happens while writing metrics", e);
}
} else {
final KafkaStreamsContainer container = service.getStreamsContainerById(containerId);
final Set<MetricGroup> groupSet = container.metrics(KafkaStreamsContainer.KafkaMetricFilter.of(filter));
final Optional<Metric> metric = groupSet.stream()
.flatMap(g -> g.metrics().stream())
.findFirst();
if (metric.isEmpty() && name.isPresent()) {
throw new MetricNotFoundException("{group=\"" + group.get() + "\", metric=" + name.get() + "}");
}
if (metric.isEmpty() && group.isPresent()) {
throw new MetricNotFoundException("{group=\"" + group.get() + "\"}");
}
if (name.isPresent() && exchange.getRelativePath().endsWith("/value")) {
sendTextValue(exchange, metric.get().value());
} else {
sendJsonResponse(exchange, groupSet);
}
}
}
}