Skip to content

Commit

Permalink
Fix Pulsar Function localrun with multiple instances and metrics serv…
Browse files Browse the repository at this point in the history
…er is enabled (apache#10208)
  • Loading branch information
jerrypeng authored and Jerry Peng committed Apr 23, 2021
1 parent d4fa088 commit 95eba26
Show file tree
Hide file tree
Showing 16 changed files with 473 additions and 196 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ public static Map<String, Metric> parseMetrics(String metrics) {
parsed.put(name, m);
});

log.info("parsed metrics: {}", parsed);
return parsed;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.pulsar.functions.instance.state.DefaultStateStore;
import org.apache.pulsar.functions.instance.state.StateManager;
import org.apache.pulsar.functions.instance.stats.ComponentStatsManager;
import org.apache.pulsar.functions.instance.stats.FunctionCollectorRegistry;
import org.apache.pulsar.functions.instance.stats.FunctionStatsManager;
import org.apache.pulsar.functions.instance.stats.SinkStatsManager;
import org.apache.pulsar.functions.instance.stats.SourceStatsManager;
Expand Down Expand Up @@ -115,7 +116,7 @@ class ContextImpl implements Context, SinkContext, SourceContext, AutoCloseable
private final Function.FunctionDetails.ComponentType componentType;

public ContextImpl(InstanceConfig config, Logger logger, PulsarClient client,
SecretsProvider secretsProvider, CollectorRegistry collectorRegistry, String[] metricsLabels,
SecretsProvider secretsProvider, FunctionCollectorRegistry collectorRegistry, String[] metricsLabels,
Function.FunctionDetails.ComponentType componentType, ComponentStatsManager statsManager,
StateManager stateManager, PulsarAdmin pulsarAdmin) {
this.config = config;
Expand Down Expand Up @@ -172,15 +173,17 @@ public ContextImpl(InstanceConfig config, Logger logger, PulsarClient client,
default:
throw new RuntimeException("Unknown component type: " + componentType);
}
this.userMetricsSummary = Summary.build()
.name(prefix + ComponentStatsManager.USER_METRIC_PREFIX)
.help("User defined metric.")
.labelNames(userMetricsLabelNames)
.quantile(0.5, 0.01)
.quantile(0.9, 0.01)
.quantile(0.99, 0.01)
.quantile(0.999, 0.01)
.register(collectorRegistry);
this.userMetricsSummary = collectorRegistry.registerIfNotExist(
prefix + ComponentStatsManager.USER_METRIC_PREFIX,
Summary.build()
.name(prefix + ComponentStatsManager.USER_METRIC_PREFIX)
.help("User defined metric.")
.labelNames(userMetricsLabelNames)
.quantile(0.5, 0.01)
.quantile(0.9, 0.01)
.quantile(0.99, 0.01)
.quantile(0.999, 0.01)
.create());
this.componentType = componentType;
this.stateManager = stateManager;
this.defaultStateStore = (DefaultStateStore) stateManager.getStore(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.apache.pulsar.functions.instance.state.StateStoreContextImpl;
import org.apache.pulsar.functions.instance.state.StateStoreProvider;
import org.apache.pulsar.functions.instance.stats.ComponentStatsManager;
import org.apache.pulsar.functions.instance.stats.FunctionCollectorRegistry;
import org.apache.pulsar.functions.proto.Function.SinkSpec;
import org.apache.pulsar.functions.proto.Function.SourceSpec;
import org.apache.pulsar.functions.proto.InstanceCommunication;
Expand Down Expand Up @@ -106,7 +107,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {

private final SecretsProvider secretsProvider;

private CollectorRegistry collectorRegistry;
private FunctionCollectorRegistry collectorRegistry;
private final String[] metricsLabels;

private InstanceCache instanceCache;
Expand All @@ -130,14 +131,13 @@ public JavaInstanceRunnable(InstanceConfig instanceConfig,
PulsarAdmin pulsarAdmin,
String stateStorageServiceUrl,
SecretsProvider secretsProvider,
CollectorRegistry collectorRegistry,
FunctionCollectorRegistry collectorRegistry,
ClassLoader functionClassLoader) {
this.instanceConfig = instanceConfig;
this.client = (PulsarClientImpl) pulsarClient;
this.pulsarAdmin = pulsarAdmin;
this.stateStorageServiceUrl = stateStorageServiceUrl;
this.secretsProvider = secretsProvider;
this.collectorRegistry = collectorRegistry;
this.functionClassLoader = functionClassLoader;
this.metricsLabels = new String[]{
instanceConfig.getFunctionDetails().getTenant(),
Expand Down Expand Up @@ -171,7 +171,7 @@ synchronized private void setup() throws Exception {
this.instanceCache = InstanceCache.getInstanceCache();

if (this.collectorRegistry == null) {
this.collectorRegistry = new CollectorRegistry();
this.collectorRegistry = FunctionCollectorRegistry.getDefaultImplementation();
}
this.stats = ComponentStatsManager.getStatsManager(this.collectorRegistry, this.metricsLabels,
this.instanceCache.getScheduledExecutorService(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public abstract class ComponentStatsManager implements AutoCloseable {

protected ScheduledFuture<?> scheduledFuture;

protected final CollectorRegistry collectorRegistry;
protected final FunctionCollectorRegistry collectorRegistry;

protected final EvictingQueue EMPTY_QUEUE = EvictingQueue.create(0);

Expand All @@ -53,7 +53,7 @@ public abstract class ComponentStatsManager implements AutoCloseable {
exceptionMetricsLabelNames[metricsLabelNames.length] = "error";
}

public static ComponentStatsManager getStatsManager(CollectorRegistry collectorRegistry,
public static ComponentStatsManager getStatsManager(FunctionCollectorRegistry collectorRegistry,
String[] metricsLabels,
ScheduledExecutorService scheduledExecutorService,
Function.FunctionDetails.ComponentType componentType) {
Expand All @@ -69,7 +69,7 @@ public static ComponentStatsManager getStatsManager(CollectorRegistry collectorR
}
}

public ComponentStatsManager(CollectorRegistry collectorRegistry,
public ComponentStatsManager(FunctionCollectorRegistry collectorRegistry,
String[] metricsLabels,
ScheduledExecutorService scheduledExecutorService) {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.pulsar.functions.instance.stats;

import io.prometheus.client.Collector;
import io.prometheus.client.CollectorRegistry;

/**
* Internal representation of Prometheus Collector Registry
*/
public abstract class FunctionCollectorRegistry extends CollectorRegistry {
public static FunctionCollectorRegistry getDefaultImplementation() {
return new FunctionCollectorRegistryImpl();
}

/**
* Register a metric if it does not yet exist. If it does exist, then return the existing metric.
* Currently, only needed by the LocalRunner when running in threaded and exposing metrics via a http server.
* This method helps resolve the conflict in which multiple instances within the LocalRunner process try to register the same metric.
* @param metricName the name of the metric
* @param collector the metric object e.g. Count, Gauge, etc.
* @param <T>
* @return If the metric with the name `metricName` already exists, return the existing metric object. If not, return null
*/
public abstract <T extends Collector> T registerIfNotExist(String metricName, T collector);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.pulsar.functions.instance.stats;

import io.prometheus.client.Collector;

import java.util.HashMap;
import java.util.Map;

public class FunctionCollectorRegistryImpl extends FunctionCollectorRegistry {

private final Map<String, Collector> namesToCollectors = new HashMap<String, Collector>();

public Collector registerIfNotExist(String metricName, Collector collector) {
synchronized (this) {
Collector existingCollector = namesToCollectors.get(metricName);
if (existingCollector == null) {
namesToCollectors.put(metricName, collector);
super.register(collector);
return collector;
}
return existingCollector;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -119,118 +119,148 @@ public class FunctionStatsManager extends ComponentStatsManager{

private final RateLimiter sysExceptionRateLimiter;

public FunctionStatsManager(CollectorRegistry collectorRegistry,
public FunctionStatsManager(FunctionCollectorRegistry collectorRegistry,
String[] metricsLabels,
ScheduledExecutorService scheduledExecutorService) {
super(collectorRegistry, metricsLabels, scheduledExecutorService);

statTotalProcessedSuccessfully = Counter.build()
statTotalProcessedSuccessfully = collectorRegistry.registerIfNotExist(
PULSAR_FUNCTION_METRICS_PREFIX + PROCESSED_SUCCESSFULLY_TOTAL,
Counter.build()
.name(PULSAR_FUNCTION_METRICS_PREFIX + PROCESSED_SUCCESSFULLY_TOTAL)
.help("Total number of messages processed successfully.")
.labelNames(metricsLabelNames)
.register(collectorRegistry);
.create());
_statTotalProcessedSuccessfully = statTotalProcessedSuccessfully.labels(metricsLabels);

statTotalSysExceptions = Counter.build()
statTotalSysExceptions = collectorRegistry.registerIfNotExist(
PULSAR_FUNCTION_METRICS_PREFIX + SYSTEM_EXCEPTIONS_TOTAL ,
Counter.build()
.name(PULSAR_FUNCTION_METRICS_PREFIX + SYSTEM_EXCEPTIONS_TOTAL)
.help("Total number of system exceptions.")
.labelNames(metricsLabelNames)
.register(collectorRegistry);
.create());
_statTotalSysExceptions = statTotalSysExceptions.labels(metricsLabels);

statTotalUserExceptions = Counter.build()
statTotalUserExceptions = collectorRegistry.registerIfNotExist(
PULSAR_FUNCTION_METRICS_PREFIX + USER_EXCEPTIONS_TOTAL,
Counter.build()
.name(PULSAR_FUNCTION_METRICS_PREFIX + USER_EXCEPTIONS_TOTAL)
.help("Total number of user exceptions.")
.labelNames(metricsLabelNames)
.register(collectorRegistry);
.create());
_statTotalUserExceptions = statTotalUserExceptions.labels(metricsLabels);

statProcessLatency = Summary.build()
statProcessLatency = collectorRegistry.registerIfNotExist(
PULSAR_FUNCTION_METRICS_PREFIX + PROCESS_LATENCY_MS,
Summary.build()
.name(PULSAR_FUNCTION_METRICS_PREFIX + PROCESS_LATENCY_MS)
.help("Process latency in milliseconds.")
.quantile(0.5, 0.01)
.quantile(0.9, 0.01)
.quantile(0.99, 0.01)
.quantile(0.999, 0.01)
.labelNames(metricsLabelNames)
.register(collectorRegistry);
.create());
_statProcessLatency = statProcessLatency.labels(metricsLabels);

statlastInvocation = Gauge.build()
statlastInvocation = collectorRegistry.registerIfNotExist(
PULSAR_FUNCTION_METRICS_PREFIX + LAST_INVOCATION,
Gauge.build()
.name(PULSAR_FUNCTION_METRICS_PREFIX + LAST_INVOCATION)
.help("The timestamp of the last invocation of the function.")
.labelNames(metricsLabelNames)
.register(collectorRegistry);
.create());
_statlastInvocation = statlastInvocation.labels(metricsLabels);

statTotalRecordsReceived = Counter.build()
statTotalRecordsReceived = collectorRegistry.registerIfNotExist(
PULSAR_FUNCTION_METRICS_PREFIX + RECEIVED_TOTAL,
Counter.build()
.name(PULSAR_FUNCTION_METRICS_PREFIX + RECEIVED_TOTAL)
.help("Total number of messages received from source.")
.labelNames(metricsLabelNames)
.register(collectorRegistry);
.create());
_statTotalRecordsReceived = statTotalRecordsReceived.labels(metricsLabels);

statTotalProcessedSuccessfully1min = Counter.build()
statTotalProcessedSuccessfully1min = collectorRegistry.registerIfNotExist(
PULSAR_FUNCTION_METRICS_PREFIX + PROCESSED_SUCCESSFULLY_TOTAL_1min,
Counter.build()
.name(PULSAR_FUNCTION_METRICS_PREFIX + PROCESSED_SUCCESSFULLY_TOTAL_1min)
.help("Total number of messages processed successfully in the last 1 minute.")
.labelNames(metricsLabelNames)
.register(collectorRegistry);
.create());
_statTotalProcessedSuccessfully1min = statTotalProcessedSuccessfully1min.labels(metricsLabels);

statTotalSysExceptions1min = Counter.build()
statTotalSysExceptions1min = collectorRegistry.registerIfNotExist(
PULSAR_FUNCTION_METRICS_PREFIX + SYSTEM_EXCEPTIONS_TOTAL_1min,
Counter.build()
.name(PULSAR_FUNCTION_METRICS_PREFIX + SYSTEM_EXCEPTIONS_TOTAL_1min)
.help("Total number of system exceptions in the last 1 minute.")
.labelNames(metricsLabelNames)
.register(collectorRegistry);
.create());
_statTotalSysExceptions1min = statTotalSysExceptions1min.labels(metricsLabels);

statTotalUserExceptions1min = Counter.build()
statTotalUserExceptions1min = collectorRegistry.registerIfNotExist(
PULSAR_FUNCTION_METRICS_PREFIX + USER_EXCEPTIONS_TOTAL_1min,
Counter.build()
.name(PULSAR_FUNCTION_METRICS_PREFIX + USER_EXCEPTIONS_TOTAL_1min)
.help("Total number of user exceptions in the last 1 minute.")
.labelNames(metricsLabelNames)
.register(collectorRegistry);
.create());
_statTotalUserExceptions1min = statTotalUserExceptions1min.labels(metricsLabels);

statProcessLatency1min = Summary.build()
statProcessLatency1min = collectorRegistry.registerIfNotExist(
PULSAR_FUNCTION_METRICS_PREFIX + PROCESS_LATENCY_MS_1min,
Summary.build()
.name(PULSAR_FUNCTION_METRICS_PREFIX + PROCESS_LATENCY_MS_1min)
.help("Process latency in milliseconds in the last 1 minute.")
.quantile(0.5, 0.01)
.quantile(0.9, 0.01)
.quantile(0.99, 0.01)
.quantile(0.999, 0.01)
.labelNames(metricsLabelNames)
.register(collectorRegistry);
.create());
_statProcessLatency1min = statProcessLatency1min.labels(metricsLabels);

statTotalRecordsReceived1min = Counter.build()
statTotalRecordsReceived1min = collectorRegistry.registerIfNotExist(
PULSAR_FUNCTION_METRICS_PREFIX + RECEIVED_TOTAL_1min,
Counter.build()
.name(PULSAR_FUNCTION_METRICS_PREFIX + RECEIVED_TOTAL_1min)
.help("Total number of messages received from source in the last 1 minute.")
.labelNames(metricsLabelNames)
.register(collectorRegistry);
.create());
_statTotalRecordsReceived1min = statTotalRecordsReceived1min.labels(metricsLabels);

userExceptions = Gauge.build()
userExceptions = collectorRegistry.registerIfNotExist(
PULSAR_FUNCTION_METRICS_PREFIX + "user_exception",
Gauge.build()
.name(PULSAR_FUNCTION_METRICS_PREFIX + "user_exception")
.labelNames(exceptionMetricsLabelNames)
.help("Exception from user code.")
.register(collectorRegistry);
sysExceptions = Gauge.build()
.create());
sysExceptions = collectorRegistry.registerIfNotExist(
PULSAR_FUNCTION_METRICS_PREFIX + "system_exception",
Gauge.build()
.name(PULSAR_FUNCTION_METRICS_PREFIX + "system_exception")
.labelNames(exceptionMetricsLabelNames)
.help("Exception from system code.")
.register(collectorRegistry);
.create());

sourceExceptions = Gauge.build()
sourceExceptions = collectorRegistry.registerIfNotExist(
PULSAR_FUNCTION_METRICS_PREFIX + "source_exception",
Gauge.build()
.name(PULSAR_FUNCTION_METRICS_PREFIX + "source_exception")
.labelNames(exceptionMetricsLabelNames)
.help("Exception from source.")
.register(collectorRegistry);
.create());

sinkExceptions = Gauge.build()
sinkExceptions = collectorRegistry.registerIfNotExist(
PULSAR_FUNCTION_METRICS_PREFIX + "sink_exception",
Gauge.build()
.name(PULSAR_FUNCTION_METRICS_PREFIX + "sink_exception")
.labelNames(exceptionMetricsLabelNames)
.help("Exception from sink.")
.register(collectorRegistry);
.create());

userExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES, null);
sysExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES, null);
Expand Down
Loading

0 comments on commit 95eba26

Please sign in to comment.