diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java index 2ea6ce367e704a..fa227057eff478 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java @@ -74,6 +74,8 @@ import org.apache.pulsar.common.io.SourceConfig; import org.apache.pulsar.common.nar.NarClassLoader; import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.ConsumerStats; +import org.apache.pulsar.common.policies.data.PublisherStats; import org.apache.pulsar.common.policies.data.SubscriptionStats; import org.apache.pulsar.common.policies.data.TenantInfo; import org.apache.pulsar.common.policies.data.TopicStats; @@ -378,7 +380,7 @@ private static SinkConfig createSinkConfig(String tenant, String namespace, Stri * * @throws Exception */ - private void testE2EPulsarFunctionLocalRun(String jarFilePathUrl) throws Exception { + private void testE2EPulsarFunctionLocalRun(String jarFilePathUrl, int parallelism) throws Exception { final String namespacePortion = "io"; final String replNamespace = tenant + "/" + namespacePortion; @@ -401,6 +403,7 @@ private void testE2EPulsarFunctionLocalRun(String jarFilePathUrl) throws Excepti functionConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE); functionConfig.setJar(jarFilePathUrl); + functionConfig.setParallelism(parallelism); int metricsPort = FunctionCommon.findAvailablePort(); @Cleanup LocalRunner localRunner = LocalRunner.builder() @@ -415,15 +418,25 @@ private void testE2EPulsarFunctionLocalRun(String jarFilePathUrl) throws Excepti .brokerServiceUrl(pulsar.getBrokerServiceUrlTls()).build(); localRunner.start(false); - retryStrategically((test) -> { + Assert.assertTrue(retryStrategically((test) -> { try { - TopicStats stats = admin.topics().getStats(sourceTopic); - return stats.subscriptions.get(subscriptionName) != null - && !stats.subscriptions.get(subscriptionName).consumers.isEmpty(); + + boolean result = false; + TopicStats topicStats = admin.topics().getStats(sourceTopic); + if (topicStats.subscriptions.containsKey(subscriptionName) + && topicStats.subscriptions.get(subscriptionName).consumers.size() == parallelism) { + for (ConsumerStats consumerStats : topicStats.subscriptions.get(subscriptionName).consumers) { + result = consumerStats.availablePermits == 1000 + && consumerStats.metadata != null + && consumerStats.metadata.containsKey("id") + && consumerStats.metadata.get("id").equals(String.format("%s/%s/%s", tenant, namespacePortion, functionName)); + } + } + return result; } catch (PulsarAdminException e) { return false; } - }, 50, 150); + }, 50, 150)); // validate pulsar sink consumer has started on the topic TopicStats stats = admin.topics().getStats(sourceTopic); assertTrue(stats.subscriptions.get(subscriptionName) != null @@ -459,16 +472,31 @@ private void testE2EPulsarFunctionLocalRun(String jarFilePathUrl) throws Excepti String prometheusMetrics = PulsarFunctionTestUtils.getPrometheusMetrics(metricsPort); log.info("prometheus metrics: {}", prometheusMetrics); - Map metrics = PulsarFunctionTestUtils.parseMetrics(prometheusMetrics); - assertFalse(metrics.isEmpty()); - - PulsarFunctionTestUtils.Metric m = metrics.get("pulsar_function_processed_successfully_total"); - assertEquals(m.tags.get("cluster"), config.getClusterName()); - assertEquals(m.tags.get("instance_id"), "0"); - assertEquals(m.tags.get("name"), functionName); - assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion)); - assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName)); - assertEquals(m.value, 5.0); + Map metricsMap = new HashMap<>(); + Arrays.asList(prometheusMetrics.split("\n")).forEach(line -> { + if (line.startsWith("pulsar_function_processed_successfully_total")) { + Map metrics = PulsarFunctionTestUtils.parseMetrics(line); + assertFalse(metrics.isEmpty()); + PulsarFunctionTestUtils.Metric m = metrics.get("pulsar_function_processed_successfully_total"); + if (m != null) { + metricsMap.put(m.tags.get("instance_id"), m); + } + } + }); + Assert.assertEquals(metricsMap.size(), parallelism); + + double totalMsgRecv = 0.0; + for (int i = 0; i < parallelism; i++) { + PulsarFunctionTestUtils.Metric m = metricsMap.get(String.valueOf(i)); + Assert.assertNotNull(m); + assertEquals(m.tags.get("cluster"), config.getClusterName()); + assertEquals(m.tags.get("instance_id"), String.valueOf(i)); + assertEquals(m.tags.get("name"), functionName); + assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion)); + assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName)); + totalMsgRecv += m.value; + } + Assert.assertEquals(totalMsgRecv, totalMsgs); // stop functions localRunner.stop(); @@ -507,7 +535,11 @@ private void testE2EPulsarFunctionLocalRun(String jarFilePathUrl) throws Excepti } } - public void testAvroFunctionLocalRun(String jarFilePathUrl) throws Exception { + private void testE2EPulsarFunctionLocalRun(String jarFilePathUrl) throws Exception { + testE2EPulsarFunctionLocalRun(jarFilePathUrl, 1); + } + + private void testAvroFunctionLocalRun(String jarFilePathUrl) throws Exception { final String namespacePortion = "io"; final String replNamespace = tenant + "/" + namespacePortion; @@ -649,7 +681,12 @@ public void testE2EPulsarFunctionLocalRunURL() throws Exception { testE2EPulsarFunctionLocalRun(fileServer.getUrl("/pulsar-functions-api-examples.jar")); } - private void testPulsarSourceLocalRun(String jarFilePathUrl) throws Exception { + @Test(timeOut = 40000) + public void testE2EPulsarFunctionLocalRunMultipleInstances() throws Throwable { + runWithPulsarFunctionsClassLoader(() -> testE2EPulsarFunctionLocalRun(null, 2)); + } + + private void testPulsarSourceLocalRun(String jarFilePathUrl, int parallelism) throws Exception { final String namespacePortion = "io"; final String replNamespace = tenant + "/" + namespacePortion; final String sinkTopic = "persistent://" + replNamespace + "/output"; @@ -664,6 +701,8 @@ private void testPulsarSourceLocalRun(String jarFilePathUrl) throws Exception { } sourceConfig.setArchive(jarFilePathUrl); + sourceConfig.setParallelism(parallelism); + int metricsPort = FunctionCommon.findAvailablePort(); @Cleanup LocalRunner localRunner = LocalRunner.builder() .sourceConfig(sourceConfig) @@ -675,55 +714,84 @@ private void testPulsarSourceLocalRun(String jarFilePathUrl) throws Exception { .tlsHostNameVerificationEnabled(false) .brokerServiceUrl(pulsar.getBrokerServiceUrlTls()) .connectorsDirectory(workerConfig.getConnectorsDirectory()) + .metricsPortStart(metricsPort) .build(); localRunner.start(false); - retryStrategically((test) -> { + Assert.assertTrue(retryStrategically((test) -> { try { - return (admin.topics().getStats(sinkTopic).publishers.size() == 1); + return admin.topics().getStats(sinkTopic).publishers.size() == parallelism; } catch (PulsarAdminException e) { return false; } - }, 10, 150); + }, 10, 150)); - retryStrategically((test) -> { + Assert.assertTrue(retryStrategically((test) -> { try { + boolean result = false; TopicStats sourceStats = admin.topics().getStats(sinkTopic); - return sourceStats.publishers.size() == 1 - && sourceStats.publishers.get(0).metadata != null - && sourceStats.publishers.get(0).metadata.containsKey("id") - && sourceStats.publishers.get(0).metadata.get("id").equals(String.format("%s/%s/%s", tenant, namespacePortion, sourceName)); + if (sourceStats.publishers.size() == parallelism) { + for (PublisherStats publisher : sourceStats.publishers) { + result = publisher.metadata != null + && publisher.metadata.containsKey("id") + && publisher.metadata.get("id").equals(String.format("%s/%s/%s", tenant, namespacePortion, sourceName)); + } + } + + return result; } catch (PulsarAdminException e) { return false; } - }, 50, 150); - - TopicStats sourceStats = admin.topics().getStats(sinkTopic); - assertEquals(sourceStats.publishers.size(), 1); - assertNotNull(sourceStats.publishers.get(0).metadata); - assertTrue(sourceStats.publishers.get(0).metadata.containsKey("id")); - assertEquals(sourceStats.publishers.get(0).metadata.get("id"), String.format("%s/%s/%s", tenant, namespacePortion, sourceName)); + }, 50, 150)); - retryStrategically((test) -> { + Assert.assertTrue(retryStrategically((test) -> { try { - return (admin.topics().getStats(sinkTopic).publishers.size() == 1) + return (admin.topics().getStats(sinkTopic).publishers.size() == parallelism) && (admin.topics().getInternalStats(sinkTopic, false).numberOfEntries > 4); } catch (PulsarAdminException e) { return false; } - }, 50, 150); - assertEquals(admin.topics().getStats(sinkTopic).publishers.size(), 1); + }, 50, 150)); + assertEquals(admin.topics().getStats(sinkTopic).publishers.size(), parallelism); + + // validate prometheus metrics + String prometheusMetrics = PulsarFunctionTestUtils.getPrometheusMetrics(metricsPort); + log.info("prometheus metrics: {}", prometheusMetrics); + + Map metricsMap = new HashMap<>(); + Arrays.asList(prometheusMetrics.split("\n")).forEach(line -> { + if (line.startsWith("pulsar_source_written_total")) { + Map metrics = PulsarFunctionTestUtils.parseMetrics(line); + assertFalse(metrics.isEmpty()); + PulsarFunctionTestUtils.Metric m = metrics.get("pulsar_source_written_total"); + if (m != null) { + metricsMap.put(m.tags.get("instance_id"), m); + } + } + }); + Assert.assertEquals(metricsMap.size(), parallelism); + + for (int i = 0; i < parallelism; i++) { + PulsarFunctionTestUtils.Metric m = metricsMap.get(String.valueOf(i)); + Assert.assertNotNull(m); + assertEquals(m.tags.get("cluster"), config.getClusterName()); + assertEquals(m.tags.get("instance_id"), String.valueOf(i)); + assertEquals(m.tags.get("name"), sourceName); + assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion)); + assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sourceName)); + assertTrue(m.value > 0.0); + } localRunner.stop(); - retryStrategically((test) -> { + Assert.assertTrue(retryStrategically((test) -> { try { return (admin.topics().getStats(sinkTopic).publishers.size() == 0); } catch (PulsarAdminException e) { return e.getStatusCode() == 404; } - }, 10, 150); + }, 10, 150)); try { assertEquals(admin.topics().getStats(sinkTopic).publishers.size(), 0); @@ -734,6 +802,10 @@ private void testPulsarSourceLocalRun(String jarFilePathUrl) throws Exception { } } + private void testPulsarSourceLocalRun(String jarFilePathUrl) throws Exception { + testPulsarSourceLocalRun(jarFilePathUrl, 1); + } + @Test(timeOut = 20000, groups = "builtin") public void testPulsarSourceStatsBuiltin() throws Exception { testPulsarSourceLocalRun(String.format("%s://data-generator", Utils.BUILTIN)); @@ -755,8 +827,12 @@ public void testPulsarSourceLocalRunWithUrl() throws Exception { testPulsarSourceLocalRun(fileServer.getUrl("/pulsar-io-data-generator.nar")); } + @Test(timeOut = 40000) + public void testPulsarSourceLocalRunMultipleInstances() throws Throwable { + runWithNarClassLoader(() -> testPulsarSourceLocalRun(null, 2)); + } - private void testPulsarSinkLocalRun(String jarFilePathUrl) throws Exception { + private void testPulsarSinkLocalRun(String jarFilePathUrl, int parallelism) throws Exception { final String namespacePortion = "io"; final String replNamespace = tenant + "/" + namespacePortion; final String sourceTopic = "persistent://" + replNamespace + "/input"; @@ -779,6 +855,7 @@ private void testPulsarSinkLocalRun(String jarFilePathUrl) throws Exception { } sinkConfig.setArchive(jarFilePathUrl); + sinkConfig.setParallelism(parallelism); int metricsPort = FunctionCommon.findAvailablePort(); @Cleanup LocalRunner localRunner = LocalRunner.builder() @@ -796,58 +873,70 @@ private void testPulsarSinkLocalRun(String jarFilePathUrl) throws Exception { localRunner.start(false); - retryStrategically((test) -> { + Assert.assertTrue(retryStrategically((test) -> { try { + boolean result = false; TopicStats topicStats = admin.topics().getStats(sourceTopic); - - return topicStats.subscriptions.containsKey(subscriptionName) - && topicStats.subscriptions.get(subscriptionName).consumers.size() == 1 - && topicStats.subscriptions.get(subscriptionName).consumers.get(0).availablePermits == 1000; - + if (topicStats.subscriptions.containsKey(subscriptionName) + && topicStats.subscriptions.get(subscriptionName).consumers.size() == parallelism) { + for (ConsumerStats consumerStats : topicStats.subscriptions.get(subscriptionName).consumers) { + result = consumerStats.availablePermits == 1000; + } + } + return result; } catch (PulsarAdminException e) { return false; } - }, 20, 150); - - TopicStats topicStats = admin.topics().getStats(sourceTopic); - assertEquals(topicStats.subscriptions.size(), 1); - assertTrue(topicStats.subscriptions.containsKey(subscriptionName)); - assertEquals(topicStats.subscriptions.get(subscriptionName).consumers.size(), 1); - assertEquals(topicStats.subscriptions.get(subscriptionName).consumers.get(0).availablePermits, 1000); + }, 20, 150)); int totalMsgs = 10; for (int i = 0; i < totalMsgs; i++) { String data = "my-message-" + i; producer.newMessage().property(propertyKey, propertyValue).value(data).send(); } - retryStrategically((test) -> { + Assert.assertTrue(retryStrategically((test) -> { try { SubscriptionStats subStats = admin.topics().getStats(sourceTopic).subscriptions.get(subscriptionName); return subStats.unackedMessages == 0 && subStats.msgThroughputOut == totalMsgs; } catch (PulsarAdminException e) { return false; } - }, 5, 200); + }, 5, 200)); // validate prometheus metrics String prometheusMetrics = PulsarFunctionTestUtils.getPrometheusMetrics(metricsPort); log.info("prometheus metrics: {}", prometheusMetrics); - Map metrics = PulsarFunctionTestUtils.parseMetrics(prometheusMetrics); - assertFalse(metrics.isEmpty()); - - PulsarFunctionTestUtils.Metric m = metrics.get("pulsar_sink_written_total"); - assertEquals(m.tags.get("cluster"), config.getClusterName()); - assertEquals(m.tags.get("instance_id"), "0"); - assertEquals(m.tags.get("name"), sinkName); - assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion)); - assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sinkName)); - assertEquals(m.value, 10.0); + Map metricsMap = new HashMap<>(); + Arrays.asList(prometheusMetrics.split("\n")).forEach(line -> { + if (line.startsWith("pulsar_sink_written_total")) { + Map metrics = PulsarFunctionTestUtils.parseMetrics(line); + assertFalse(metrics.isEmpty()); + PulsarFunctionTestUtils.Metric m = metrics.get("pulsar_sink_written_total"); + if (m != null) { + metricsMap.put(m.tags.get("instance_id"), m); + } + } + }); + Assert.assertEquals(metricsMap.size(), parallelism); + + double totalNumRecvMsg = 0; + for (int i = 0; i < parallelism; i++) { + PulsarFunctionTestUtils.Metric m = metricsMap.get(String.valueOf(i)); + Assert.assertNotNull(m); + assertEquals(m.tags.get("cluster"), config.getClusterName()); + assertEquals(m.tags.get("instance_id"), String.valueOf(i)); + assertEquals(m.tags.get("name"), sinkName); + assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion)); + assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sinkName)); + totalNumRecvMsg += m.value; + } + assertEquals(totalNumRecvMsg, totalMsgs); // stop sink localRunner.stop(); - retryStrategically((test) -> { + Assert.assertTrue(retryStrategically((test) -> { try { TopicStats stats = admin.topics().getStats(sourceTopic); return stats.subscriptions.get(subscriptionName) != null @@ -855,14 +944,18 @@ private void testPulsarSinkLocalRun(String jarFilePathUrl) throws Exception { } catch (PulsarAdminException e) { return false; } - }, 20, 150); + }, 20, 150)); - topicStats = admin.topics().getStats(sourceTopic); + TopicStats topicStats = admin.topics().getStats(sourceTopic); assertTrue(topicStats.subscriptions.get(subscriptionName) != null && topicStats.subscriptions.get(subscriptionName).consumers.isEmpty()); } + private void testPulsarSinkLocalRun(String jarFilePathUrl) throws Exception { + testPulsarSourceLocalRun(jarFilePathUrl, 1); + } + @Test(timeOut = 20000, groups = "builtin") public void testPulsarSinkStatsBuiltin() throws Exception { testPulsarSinkLocalRun(String.format("%s://data-generator", Utils.BUILTIN)); @@ -873,6 +966,22 @@ public void testPulsarSinkStatsNoArchive() throws Throwable { runWithNarClassLoader(() -> testPulsarSinkLocalRun(null)); } + @Test(timeOut = 20000) + public void testPulsarSinkStatsWithFile() throws Exception { + String jarFilePathUrl = getPulsarIODataGeneratorNar().toURI().toString(); + testPulsarSinkLocalRun(jarFilePathUrl); + } + + @Test(timeOut = 40000) + public void testPulsarSinkStatsWithUrl() throws Exception { + testPulsarSinkLocalRun(fileServer.getUrl("/pulsar-io-data-generator.nar")); + } + + @Test(timeOut = 40000) + public void testPulsarSinkStatsMultipleInstances() throws Throwable { + runWithNarClassLoader(() -> testPulsarSinkLocalRun(null, 2)); + } + private void runWithNarClassLoader(Assert.ThrowingRunnable throwingRunnable) throws Throwable { ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader(); try (NarClassLoader classLoader = NarClassLoader.getFromArchive(getPulsarIODataGeneratorNar(), Collections.emptySet(), originalClassLoader, NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR)) { @@ -894,15 +1003,4 @@ private void runWithPulsarFunctionsClassLoader(Assert.ThrowingRunnable throwingR Thread.currentThread().setContextClassLoader(originalClassLoader); } } - - @Test(timeOut = 20000) - public void testPulsarSinkStatsWithFile() throws Exception { - String jarFilePathUrl = getPulsarIODataGeneratorNar().toURI().toString(); - testPulsarSinkLocalRun(jarFilePathUrl); - } - - @Test(timeOut = 40000) - public void testPulsarSinkStatsWithUrl() throws Exception { - testPulsarSinkLocalRun(fileServer.getUrl("/pulsar-io-data-generator.nar")); - } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTestUtils.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTestUtils.java index 769cfe9b237957..570cbcbcc1b299 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTestUtils.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTestUtils.java @@ -93,7 +93,6 @@ public static Map parseMetrics(String metrics) { parsed.put(name, m); }); - log.info("parsed metrics: {}", parsed); return parsed; } diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java index e9273325386614..357bbcf3ede530 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java @@ -56,6 +56,7 @@ import org.apache.pulsar.functions.instance.state.DefaultStateStore; import org.apache.pulsar.functions.instance.state.StateManager; import org.apache.pulsar.functions.instance.stats.ComponentStatsManager; +import org.apache.pulsar.functions.instance.stats.FunctionCollectorRegistry; import org.apache.pulsar.functions.instance.stats.FunctionStatsManager; import org.apache.pulsar.functions.instance.stats.SinkStatsManager; import org.apache.pulsar.functions.instance.stats.SourceStatsManager; @@ -115,7 +116,7 @@ class ContextImpl implements Context, SinkContext, SourceContext, AutoCloseable private final Function.FunctionDetails.ComponentType componentType; public ContextImpl(InstanceConfig config, Logger logger, PulsarClient client, - SecretsProvider secretsProvider, CollectorRegistry collectorRegistry, String[] metricsLabels, + SecretsProvider secretsProvider, FunctionCollectorRegistry collectorRegistry, String[] metricsLabels, Function.FunctionDetails.ComponentType componentType, ComponentStatsManager statsManager, StateManager stateManager, PulsarAdmin pulsarAdmin) { this.config = config; @@ -172,15 +173,17 @@ public ContextImpl(InstanceConfig config, Logger logger, PulsarClient client, default: throw new RuntimeException("Unknown component type: " + componentType); } - this.userMetricsSummary = Summary.build() - .name(prefix + ComponentStatsManager.USER_METRIC_PREFIX) - .help("User defined metric.") - .labelNames(userMetricsLabelNames) - .quantile(0.5, 0.01) - .quantile(0.9, 0.01) - .quantile(0.99, 0.01) - .quantile(0.999, 0.01) - .register(collectorRegistry); + this.userMetricsSummary = collectorRegistry.registerIfNotExist( + prefix + ComponentStatsManager.USER_METRIC_PREFIX, + Summary.build() + .name(prefix + ComponentStatsManager.USER_METRIC_PREFIX) + .help("User defined metric.") + .labelNames(userMetricsLabelNames) + .quantile(0.5, 0.01) + .quantile(0.9, 0.01) + .quantile(0.99, 0.01) + .quantile(0.999, 0.01) + .create()); this.componentType = componentType; this.stateManager = stateManager; this.defaultStateStore = (DefaultStateStore) stateManager.getStore( diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java index d761ccb108ec0d..36f8608928fc82 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java @@ -55,6 +55,7 @@ import org.apache.pulsar.functions.instance.state.StateStoreContextImpl; import org.apache.pulsar.functions.instance.state.StateStoreProvider; import org.apache.pulsar.functions.instance.stats.ComponentStatsManager; +import org.apache.pulsar.functions.instance.stats.FunctionCollectorRegistry; import org.apache.pulsar.functions.proto.Function.SinkSpec; import org.apache.pulsar.functions.proto.Function.SourceSpec; import org.apache.pulsar.functions.proto.InstanceCommunication; @@ -106,7 +107,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { private final SecretsProvider secretsProvider; - private CollectorRegistry collectorRegistry; + private FunctionCollectorRegistry collectorRegistry; private final String[] metricsLabels; private InstanceCache instanceCache; @@ -130,14 +131,13 @@ public JavaInstanceRunnable(InstanceConfig instanceConfig, PulsarAdmin pulsarAdmin, String stateStorageServiceUrl, SecretsProvider secretsProvider, - CollectorRegistry collectorRegistry, + FunctionCollectorRegistry collectorRegistry, ClassLoader functionClassLoader) { this.instanceConfig = instanceConfig; this.client = (PulsarClientImpl) pulsarClient; this.pulsarAdmin = pulsarAdmin; this.stateStorageServiceUrl = stateStorageServiceUrl; this.secretsProvider = secretsProvider; - this.collectorRegistry = collectorRegistry; this.functionClassLoader = functionClassLoader; this.metricsLabels = new String[]{ instanceConfig.getFunctionDetails().getTenant(), @@ -171,7 +171,7 @@ synchronized private void setup() throws Exception { this.instanceCache = InstanceCache.getInstanceCache(); if (this.collectorRegistry == null) { - this.collectorRegistry = new CollectorRegistry(); + this.collectorRegistry = FunctionCollectorRegistry.getDefaultImplementation(); } this.stats = ComponentStatsManager.getStatsManager(this.collectorRegistry, this.metricsLabels, this.instanceCache.getScheduledExecutorService(), diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/ComponentStatsManager.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/ComponentStatsManager.java index cbdcc0fc27bd66..d822aa06c0a39d 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/ComponentStatsManager.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/ComponentStatsManager.java @@ -38,7 +38,7 @@ public abstract class ComponentStatsManager implements AutoCloseable { protected ScheduledFuture scheduledFuture; - protected final CollectorRegistry collectorRegistry; + protected final FunctionCollectorRegistry collectorRegistry; protected final EvictingQueue EMPTY_QUEUE = EvictingQueue.create(0); @@ -53,7 +53,7 @@ public abstract class ComponentStatsManager implements AutoCloseable { exceptionMetricsLabelNames[metricsLabelNames.length] = "error"; } - public static ComponentStatsManager getStatsManager(CollectorRegistry collectorRegistry, + public static ComponentStatsManager getStatsManager(FunctionCollectorRegistry collectorRegistry, String[] metricsLabels, ScheduledExecutorService scheduledExecutorService, Function.FunctionDetails.ComponentType componentType) { @@ -69,7 +69,7 @@ public static ComponentStatsManager getStatsManager(CollectorRegistry collectorR } } - public ComponentStatsManager(CollectorRegistry collectorRegistry, + public ComponentStatsManager(FunctionCollectorRegistry collectorRegistry, String[] metricsLabels, ScheduledExecutorService scheduledExecutorService) { diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionCollectorRegistry.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionCollectorRegistry.java new file mode 100644 index 00000000000000..d29763999887dc --- /dev/null +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionCollectorRegistry.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.functions.instance.stats; + +import io.prometheus.client.Collector; +import io.prometheus.client.CollectorRegistry; + +/** + * Internal representation of Prometheus Collector Registry + */ +public abstract class FunctionCollectorRegistry extends CollectorRegistry { + public static FunctionCollectorRegistry getDefaultImplementation() { + return new FunctionCollectorRegistryImpl(); + } + + /** + * Register a metric if it does not yet exist. If it does exist, then return the existing metric. + * Currently, only needed by the LocalRunner when running in threaded and exposing metrics via a http server. + * This method helps resolve the conflict in which multiple instances within the LocalRunner process try to register the same metric. + * @param metricName the name of the metric + * @param collector the metric object e.g. Count, Gauge, etc. + * @param + * @return If the metric with the name `metricName` already exists, return the existing metric object. If not, return null + */ + public abstract T registerIfNotExist(String metricName, T collector); +} diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionCollectorRegistryImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionCollectorRegistryImpl.java new file mode 100644 index 00000000000000..9f003a4ab1ac05 --- /dev/null +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionCollectorRegistryImpl.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.functions.instance.stats; + +import io.prometheus.client.Collector; + +import java.util.HashMap; +import java.util.Map; + +public class FunctionCollectorRegistryImpl extends FunctionCollectorRegistry { + + private final Map namesToCollectors = new HashMap(); + + public Collector registerIfNotExist(String metricName, Collector collector) { + synchronized (this) { + Collector existingCollector = namesToCollectors.get(metricName); + if (existingCollector == null) { + namesToCollectors.put(metricName, collector); + super.register(collector); + return collector; + } + return existingCollector; + } + } +} diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java index f02b8505ebba4d..08ea9ea1e52c42 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java @@ -119,33 +119,41 @@ public class FunctionStatsManager extends ComponentStatsManager{ private final RateLimiter sysExceptionRateLimiter; - public FunctionStatsManager(CollectorRegistry collectorRegistry, + public FunctionStatsManager(FunctionCollectorRegistry collectorRegistry, String[] metricsLabels, ScheduledExecutorService scheduledExecutorService) { super(collectorRegistry, metricsLabels, scheduledExecutorService); - statTotalProcessedSuccessfully = Counter.build() + statTotalProcessedSuccessfully = collectorRegistry.registerIfNotExist( + PULSAR_FUNCTION_METRICS_PREFIX + PROCESSED_SUCCESSFULLY_TOTAL, + Counter.build() .name(PULSAR_FUNCTION_METRICS_PREFIX + PROCESSED_SUCCESSFULLY_TOTAL) .help("Total number of messages processed successfully.") .labelNames(metricsLabelNames) - .register(collectorRegistry); + .create()); _statTotalProcessedSuccessfully = statTotalProcessedSuccessfully.labels(metricsLabels); - statTotalSysExceptions = Counter.build() + statTotalSysExceptions = collectorRegistry.registerIfNotExist( + PULSAR_FUNCTION_METRICS_PREFIX + SYSTEM_EXCEPTIONS_TOTAL , + Counter.build() .name(PULSAR_FUNCTION_METRICS_PREFIX + SYSTEM_EXCEPTIONS_TOTAL) .help("Total number of system exceptions.") .labelNames(metricsLabelNames) - .register(collectorRegistry); + .create()); _statTotalSysExceptions = statTotalSysExceptions.labels(metricsLabels); - statTotalUserExceptions = Counter.build() + statTotalUserExceptions = collectorRegistry.registerIfNotExist( + PULSAR_FUNCTION_METRICS_PREFIX + USER_EXCEPTIONS_TOTAL, + Counter.build() .name(PULSAR_FUNCTION_METRICS_PREFIX + USER_EXCEPTIONS_TOTAL) .help("Total number of user exceptions.") .labelNames(metricsLabelNames) - .register(collectorRegistry); + .create()); _statTotalUserExceptions = statTotalUserExceptions.labels(metricsLabels); - statProcessLatency = Summary.build() + statProcessLatency = collectorRegistry.registerIfNotExist( + PULSAR_FUNCTION_METRICS_PREFIX + PROCESS_LATENCY_MS, + Summary.build() .name(PULSAR_FUNCTION_METRICS_PREFIX + PROCESS_LATENCY_MS) .help("Process latency in milliseconds.") .quantile(0.5, 0.01) @@ -153,45 +161,57 @@ public FunctionStatsManager(CollectorRegistry collectorRegistry, .quantile(0.99, 0.01) .quantile(0.999, 0.01) .labelNames(metricsLabelNames) - .register(collectorRegistry); + .create()); _statProcessLatency = statProcessLatency.labels(metricsLabels); - statlastInvocation = Gauge.build() + statlastInvocation = collectorRegistry.registerIfNotExist( + PULSAR_FUNCTION_METRICS_PREFIX + LAST_INVOCATION, + Gauge.build() .name(PULSAR_FUNCTION_METRICS_PREFIX + LAST_INVOCATION) .help("The timestamp of the last invocation of the function.") .labelNames(metricsLabelNames) - .register(collectorRegistry); + .create()); _statlastInvocation = statlastInvocation.labels(metricsLabels); - statTotalRecordsReceived = Counter.build() + statTotalRecordsReceived = collectorRegistry.registerIfNotExist( + PULSAR_FUNCTION_METRICS_PREFIX + RECEIVED_TOTAL, + Counter.build() .name(PULSAR_FUNCTION_METRICS_PREFIX + RECEIVED_TOTAL) .help("Total number of messages received from source.") .labelNames(metricsLabelNames) - .register(collectorRegistry); + .create()); _statTotalRecordsReceived = statTotalRecordsReceived.labels(metricsLabels); - statTotalProcessedSuccessfully1min = Counter.build() + statTotalProcessedSuccessfully1min = collectorRegistry.registerIfNotExist( + PULSAR_FUNCTION_METRICS_PREFIX + PROCESSED_SUCCESSFULLY_TOTAL_1min, + Counter.build() .name(PULSAR_FUNCTION_METRICS_PREFIX + PROCESSED_SUCCESSFULLY_TOTAL_1min) .help("Total number of messages processed successfully in the last 1 minute.") .labelNames(metricsLabelNames) - .register(collectorRegistry); + .create()); _statTotalProcessedSuccessfully1min = statTotalProcessedSuccessfully1min.labels(metricsLabels); - statTotalSysExceptions1min = Counter.build() + statTotalSysExceptions1min = collectorRegistry.registerIfNotExist( + PULSAR_FUNCTION_METRICS_PREFIX + SYSTEM_EXCEPTIONS_TOTAL_1min, + Counter.build() .name(PULSAR_FUNCTION_METRICS_PREFIX + SYSTEM_EXCEPTIONS_TOTAL_1min) .help("Total number of system exceptions in the last 1 minute.") .labelNames(metricsLabelNames) - .register(collectorRegistry); + .create()); _statTotalSysExceptions1min = statTotalSysExceptions1min.labels(metricsLabels); - statTotalUserExceptions1min = Counter.build() + statTotalUserExceptions1min = collectorRegistry.registerIfNotExist( + PULSAR_FUNCTION_METRICS_PREFIX + USER_EXCEPTIONS_TOTAL_1min, + Counter.build() .name(PULSAR_FUNCTION_METRICS_PREFIX + USER_EXCEPTIONS_TOTAL_1min) .help("Total number of user exceptions in the last 1 minute.") .labelNames(metricsLabelNames) - .register(collectorRegistry); + .create()); _statTotalUserExceptions1min = statTotalUserExceptions1min.labels(metricsLabels); - statProcessLatency1min = Summary.build() + statProcessLatency1min = collectorRegistry.registerIfNotExist( + PULSAR_FUNCTION_METRICS_PREFIX + PROCESS_LATENCY_MS_1min, + Summary.build() .name(PULSAR_FUNCTION_METRICS_PREFIX + PROCESS_LATENCY_MS_1min) .help("Process latency in milliseconds in the last 1 minute.") .quantile(0.5, 0.01) @@ -199,38 +219,48 @@ public FunctionStatsManager(CollectorRegistry collectorRegistry, .quantile(0.99, 0.01) .quantile(0.999, 0.01) .labelNames(metricsLabelNames) - .register(collectorRegistry); + .create()); _statProcessLatency1min = statProcessLatency1min.labels(metricsLabels); - statTotalRecordsReceived1min = Counter.build() + statTotalRecordsReceived1min = collectorRegistry.registerIfNotExist( + PULSAR_FUNCTION_METRICS_PREFIX + RECEIVED_TOTAL_1min, + Counter.build() .name(PULSAR_FUNCTION_METRICS_PREFIX + RECEIVED_TOTAL_1min) .help("Total number of messages received from source in the last 1 minute.") .labelNames(metricsLabelNames) - .register(collectorRegistry); + .create()); _statTotalRecordsReceived1min = statTotalRecordsReceived1min.labels(metricsLabels); - userExceptions = Gauge.build() + userExceptions = collectorRegistry.registerIfNotExist( + PULSAR_FUNCTION_METRICS_PREFIX + "user_exception", + Gauge.build() .name(PULSAR_FUNCTION_METRICS_PREFIX + "user_exception") .labelNames(exceptionMetricsLabelNames) .help("Exception from user code.") - .register(collectorRegistry); - sysExceptions = Gauge.build() + .create()); + sysExceptions = collectorRegistry.registerIfNotExist( + PULSAR_FUNCTION_METRICS_PREFIX + "system_exception", + Gauge.build() .name(PULSAR_FUNCTION_METRICS_PREFIX + "system_exception") .labelNames(exceptionMetricsLabelNames) .help("Exception from system code.") - .register(collectorRegistry); + .create()); - sourceExceptions = Gauge.build() + sourceExceptions = collectorRegistry.registerIfNotExist( + PULSAR_FUNCTION_METRICS_PREFIX + "source_exception", + Gauge.build() .name(PULSAR_FUNCTION_METRICS_PREFIX + "source_exception") .labelNames(exceptionMetricsLabelNames) .help("Exception from source.") - .register(collectorRegistry); + .create()); - sinkExceptions = Gauge.build() + sinkExceptions = collectorRegistry.registerIfNotExist( + PULSAR_FUNCTION_METRICS_PREFIX + "sink_exception", + Gauge.build() .name(PULSAR_FUNCTION_METRICS_PREFIX + "sink_exception") .labelNames(exceptionMetricsLabelNames) .help("Exception from sink.") - .register(collectorRegistry); + .create()); userExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES, null); sysExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES, null); diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SinkStatsManager.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SinkStatsManager.java index 401aa34be697c5..46999cd7010c5a 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SinkStatsManager.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SinkStatsManager.java @@ -95,84 +95,106 @@ public class SinkStatsManager extends ComponentStatsManager { private final RateLimiter sinkExceptionRateLimiter; - public SinkStatsManager(CollectorRegistry collectorRegistry, String[] metricsLabels, ScheduledExecutorService + public SinkStatsManager(FunctionCollectorRegistry collectorRegistry, String[] metricsLabels, ScheduledExecutorService scheduledExecutorService) { super(collectorRegistry, metricsLabels, scheduledExecutorService); - statTotalRecordsReceived = Counter.build() + statTotalRecordsReceived = collectorRegistry.registerIfNotExist( + PULSAR_SINK_METRICS_PREFIX + RECEIVED_TOTAL, + Counter.build() .name(PULSAR_SINK_METRICS_PREFIX + RECEIVED_TOTAL) .help("Total number of records sink has received from Pulsar topic(s).") .labelNames(metricsLabelNames) - .register(collectorRegistry); + .create()); _statTotalRecordsReceived = statTotalRecordsReceived.labels(metricsLabels); - statTotalSysExceptions = Counter.build() + statTotalSysExceptions = collectorRegistry.registerIfNotExist( + PULSAR_SINK_METRICS_PREFIX + SYSTEM_EXCEPTIONS_TOTAL, + Counter.build() .name(PULSAR_SINK_METRICS_PREFIX + SYSTEM_EXCEPTIONS_TOTAL) .help("Total number of system exceptions.") .labelNames(metricsLabelNames) - .register(collectorRegistry); + .create()); _statTotalSysExceptions = statTotalSysExceptions.labels(metricsLabels); - statTotalSinkExceptions = Counter.build() + statTotalSinkExceptions = collectorRegistry.registerIfNotExist( + PULSAR_SINK_METRICS_PREFIX + SINK_EXCEPTIONS_TOTAL, + Counter.build() .name(PULSAR_SINK_METRICS_PREFIX + SINK_EXCEPTIONS_TOTAL) .help("Total number of sink exceptions.") .labelNames(metricsLabelNames) - .register(collectorRegistry); + .create()); _statTotalSinkExceptions = statTotalSinkExceptions.labels(metricsLabels); - statTotalWritten = Counter.build() + statTotalWritten = collectorRegistry.registerIfNotExist( + PULSAR_SINK_METRICS_PREFIX + WRITTEN_TOTAL, + Counter.build() .name(PULSAR_SINK_METRICS_PREFIX + WRITTEN_TOTAL) .help("Total number of records processed by sink.") .labelNames(metricsLabelNames) - .register(collectorRegistry); + .create()); _statTotalWritten = statTotalWritten.labels(metricsLabels); - statlastInvocation = Gauge.build() + statlastInvocation = collectorRegistry.registerIfNotExist( + PULSAR_SINK_METRICS_PREFIX + LAST_INVOCATION, + Gauge.build() .name(PULSAR_SINK_METRICS_PREFIX + LAST_INVOCATION) .help("The timestamp of the last invocation of the sink.") .labelNames(metricsLabelNames) - .register(collectorRegistry); + .create()); _statlastInvocation = statlastInvocation.labels(metricsLabels); - statTotalRecordsReceived1min = Counter.build() + statTotalRecordsReceived1min = collectorRegistry.registerIfNotExist( + PULSAR_SINK_METRICS_PREFIX + RECEIVED_TOTAL_1min, + Counter.build() .name(PULSAR_SINK_METRICS_PREFIX + RECEIVED_TOTAL_1min) .help("Total number of messages sink has received from Pulsar topic(s) in the last 1 minute.") .labelNames(metricsLabelNames) - .register(collectorRegistry); + .create()); _statTotalRecordsReceived1min = statTotalRecordsReceived1min.labels(metricsLabels); - statTotalSysExceptions1min = Counter.build() + statTotalSysExceptions1min = collectorRegistry.registerIfNotExist( + PULSAR_SINK_METRICS_PREFIX + SYSTEM_EXCEPTIONS_TOTAL_1min, + Counter.build() .name(PULSAR_SINK_METRICS_PREFIX + SYSTEM_EXCEPTIONS_TOTAL_1min) .help("Total number of system exceptions in the last 1 minute.") .labelNames(metricsLabelNames) - .register(collectorRegistry); + .create()); _statTotalSysExceptions1min = statTotalSysExceptions1min.labels(metricsLabels); - statTotalSinkExceptions1min = Counter.build() + statTotalSinkExceptions1min = collectorRegistry.registerIfNotExist( + PULSAR_SINK_METRICS_PREFIX + SINK_EXCEPTIONS_TOTAL_1min, + Counter.build() .name(PULSAR_SINK_METRICS_PREFIX + SINK_EXCEPTIONS_TOTAL_1min) .help("Total number of sink exceptions in the last 1 minute.") .labelNames(metricsLabelNames) - .register(collectorRegistry); + .create()); _statTotalSinkExceptions1min = statTotalSinkExceptions1min.labels(metricsLabels); - statTotalWritten1min = Counter.build() + statTotalWritten1min = collectorRegistry.registerIfNotExist( + PULSAR_SINK_METRICS_PREFIX + WRITTEN_TOTAL_1min, + Counter.build() .name(PULSAR_SINK_METRICS_PREFIX + WRITTEN_TOTAL_1min) .help("Total number of records processed by sink the last 1 minute.") .labelNames(metricsLabelNames) - .register(collectorRegistry); + .create()); _statTotalWritten1min = statTotalWritten1min.labels(metricsLabels); - sysExceptions = Gauge.build() + sysExceptions = collectorRegistry.registerIfNotExist( + PULSAR_SINK_METRICS_PREFIX + "system_exception", + Gauge.build() .name(PULSAR_SINK_METRICS_PREFIX + "system_exception") .labelNames(exceptionMetricsLabelNames) .help("Exception from system code.") - .register(collectorRegistry); + .create()); - sinkExceptions = Gauge.build() + sinkExceptions = collectorRegistry.registerIfNotExist( + PULSAR_SINK_METRICS_PREFIX + "sink_exception", + Gauge.build() .name(PULSAR_SINK_METRICS_PREFIX + "sink_exception") .labelNames(exceptionMetricsLabelNames) .help("Exception from sink.") - .register(collectorRegistry); + .create()); sysExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES, null); sinkExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES, null); diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SourceStatsManager.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SourceStatsManager.java index 287240c04e70cb..e79e0b5bffca1b 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SourceStatsManager.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SourceStatsManager.java @@ -94,84 +94,106 @@ public class SourceStatsManager extends ComponentStatsManager { protected final RateLimiter sourceExceptionRateLimiter; - public SourceStatsManager(CollectorRegistry collectorRegistry, String[] metricsLabels, ScheduledExecutorService + public SourceStatsManager(FunctionCollectorRegistry collectorRegistry, String[] metricsLabels, ScheduledExecutorService scheduledExecutorService) { super(collectorRegistry, metricsLabels, scheduledExecutorService); - statTotalRecordsReceived = Counter.build() + statTotalRecordsReceived = collectorRegistry.registerIfNotExist( + PULSAR_SOURCE_METRICS_PREFIX + RECEIVED_TOTAL, + Counter.build() .name(PULSAR_SOURCE_METRICS_PREFIX + RECEIVED_TOTAL) .help("Total number of records received from source.") .labelNames(metricsLabelNames) - .register(collectorRegistry); + .create()); _statTotalRecordsReceived = statTotalRecordsReceived.labels(metricsLabels); - statTotalSysExceptions = Counter.build() + statTotalSysExceptions = collectorRegistry.registerIfNotExist( + PULSAR_SOURCE_METRICS_PREFIX + SYSTEM_EXCEPTIONS_TOTAL, + Counter.build() .name(PULSAR_SOURCE_METRICS_PREFIX + SYSTEM_EXCEPTIONS_TOTAL) .help("Total number of system exceptions.") .labelNames(metricsLabelNames) - .register(collectorRegistry); + .create()); _statTotalSysExceptions = statTotalSysExceptions.labels(metricsLabels); - statTotalSourceExceptions = Counter.build() + statTotalSourceExceptions = collectorRegistry.registerIfNotExist( + PULSAR_SOURCE_METRICS_PREFIX + SOURCE_EXCEPTIONS_TOTAL, + Counter.build() .name(PULSAR_SOURCE_METRICS_PREFIX + SOURCE_EXCEPTIONS_TOTAL) .help("Total number of source exceptions.") .labelNames(metricsLabelNames) - .register(collectorRegistry); + .create()); _statTotalSourceExceptions = statTotalSourceExceptions.labels(metricsLabels); - statTotalWritten = Counter.build() + statTotalWritten = collectorRegistry.registerIfNotExist( + PULSAR_SOURCE_METRICS_PREFIX + WRITTEN_TOTAL, + Counter.build() .name(PULSAR_SOURCE_METRICS_PREFIX + WRITTEN_TOTAL) .help("Total number of records written to a Pulsar topic.") .labelNames(metricsLabelNames) - .register(collectorRegistry); + .create()); _statTotalWritten = statTotalWritten.labels(metricsLabels); - statlastInvocation = Gauge.build() + statlastInvocation = collectorRegistry.registerIfNotExist( + PULSAR_SOURCE_METRICS_PREFIX + LAST_INVOCATION, + Gauge.build() .name(PULSAR_SOURCE_METRICS_PREFIX + LAST_INVOCATION) .help("The timestamp of the last invocation of the source.") .labelNames(metricsLabelNames) - .register(collectorRegistry); + .create()); _statlastInvocation = statlastInvocation.labels(metricsLabels); - statTotalRecordsReceived1min = Counter.build() + statTotalRecordsReceived1min = collectorRegistry.registerIfNotExist( + PULSAR_SOURCE_METRICS_PREFIX + RECEIVED_TOTAL_1min, + Counter.build() .name(PULSAR_SOURCE_METRICS_PREFIX + RECEIVED_TOTAL_1min) .help("Total number of records received from source in the last 1 minute.") .labelNames(metricsLabelNames) - .register(collectorRegistry); + .create()); _statTotalRecordsReceived1min = statTotalRecordsReceived1min.labels(metricsLabels); - statTotalSysExceptions1min = Counter.build() + statTotalSysExceptions1min = collectorRegistry.registerIfNotExist( + PULSAR_SOURCE_METRICS_PREFIX + SYSTEM_EXCEPTIONS_TOTAL_1min, + Counter.build() .name(PULSAR_SOURCE_METRICS_PREFIX + SYSTEM_EXCEPTIONS_TOTAL_1min) .help("Total number of system exceptions in the last 1 minute.") .labelNames(metricsLabelNames) - .register(collectorRegistry); + .create()); _statTotalSysExceptions1min = statTotalSysExceptions1min.labels(metricsLabels); - statTotalSourceExceptions1min = Counter.build() + statTotalSourceExceptions1min = collectorRegistry.registerIfNotExist( + PULSAR_SOURCE_METRICS_PREFIX + SOURCE_EXCEPTIONS_TOTAL_1min, + Counter.build() .name(PULSAR_SOURCE_METRICS_PREFIX + SOURCE_EXCEPTIONS_TOTAL_1min) .help("Total number of source exceptions in the last 1 minute.") .labelNames(metricsLabelNames) - .register(collectorRegistry); + .create()); _statTotalSourceExceptions1min = statTotalSourceExceptions1min.labels(metricsLabels); - statTotalWritten1min = Counter.build() + statTotalWritten1min = collectorRegistry.registerIfNotExist( + PULSAR_SOURCE_METRICS_PREFIX + WRITTEN_TOTAL_1min, + Counter.build() .name(PULSAR_SOURCE_METRICS_PREFIX + WRITTEN_TOTAL_1min) .help("Total number of records written to a Pulsar topic in the last 1 minute.") .labelNames(metricsLabelNames) - .register(collectorRegistry); + .create()); _statTotalWritten1min = statTotalWritten1min.labels(metricsLabels); - sysExceptions = Gauge.build() + sysExceptions = collectorRegistry.registerIfNotExist( + PULSAR_SOURCE_METRICS_PREFIX + "system_exception", + Gauge.build() .name(PULSAR_SOURCE_METRICS_PREFIX + "system_exception") .labelNames(exceptionMetricsLabelNames) .help("Exception from system code.") - .register(collectorRegistry); + .create()); - sourceExceptions = Gauge.build() + sourceExceptions = collectorRegistry.registerIfNotExist( + PULSAR_SOURCE_METRICS_PREFIX + "source_exception", + Gauge.build() .name(PULSAR_SOURCE_METRICS_PREFIX + "source_exception") .labelNames(exceptionMetricsLabelNames) .help("Exception from source.") - .register(collectorRegistry); + .create()); sysExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES, null); sourceExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES, null); diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java index ceb87c3197387e..8bc8a54a5c44a0 100644 --- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java +++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java @@ -50,6 +50,7 @@ import org.apache.pulsar.functions.api.Record; import org.apache.pulsar.functions.instance.state.BKStateStoreImpl; import org.apache.pulsar.functions.instance.state.InstanceStateManager; +import org.apache.pulsar.functions.instance.stats.FunctionCollectorRegistry; import org.apache.pulsar.functions.proto.Function.FunctionDetails; import org.apache.pulsar.functions.secretsprovider.EnvironmentBasedSecretsProvider; import org.slf4j.Logger; @@ -92,7 +93,7 @@ public void setup() { config, logger, client, - new EnvironmentBasedSecretsProvider(), new CollectorRegistry(), new String[0], + new EnvironmentBasedSecretsProvider(), FunctionCollectorRegistry.getDefaultImplementation(), new String[0], FunctionDetails.ComponentType.FUNCTION, null, new InstanceStateManager(), pulsarAdmin); context.setCurrentMessageContext((Record) () -> null); @@ -182,7 +183,7 @@ public void testGetPulsarAdminWithExposePulsarAdminDisabled() { config, logger, client, - new EnvironmentBasedSecretsProvider(), new CollectorRegistry(), new String[0], + new EnvironmentBasedSecretsProvider(), FunctionCollectorRegistry.getDefaultImplementation(), new String[0], FunctionDetails.ComponentType.FUNCTION, null, new InstanceStateManager(), pulsarAdmin); context.getPulsarAdmin(); diff --git a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java index 2a71f83a1514de..95201fe75f491f 100644 --- a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java +++ b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java @@ -59,6 +59,7 @@ import org.apache.pulsar.common.util.ObjectMapperFactory; import org.apache.pulsar.common.util.Reflections; import org.apache.pulsar.functions.instance.InstanceConfig; +import org.apache.pulsar.functions.instance.stats.FunctionCollectorRegistry; import org.apache.pulsar.functions.proto.Function; import org.apache.pulsar.functions.runtime.RuntimeSpawner; import org.apache.pulsar.functions.runtime.RuntimeUtils; @@ -169,7 +170,7 @@ public RuntimeEnv convert(String value) { protected String secretsProviderClassName; @Parameter(names = "--secretsProviderConfig", description = "Whats the config for the secrets provider", hidden = true) protected String secretsProviderConfig; - @Parameter(names = "--metricsPortStart", description = "The starting port range for metrics server", hidden = true) + @Parameter(names = "--metricsPortStart", description = "The starting port range for metrics server. When running instances as threads, one metrics server is used to host the stats for all instances.", hidden = true) protected Integer metricsPortStart; private static final String DEFAULT_SERVICE_URL = "pulsar://localhost:6650"; @@ -483,7 +484,16 @@ private void startProcessMode(org.apache.pulsar.functions.proto.Function.Functio instanceConfig.setInstanceId(i + instanceIdOffset); instanceConfig.setMaxBufferedTuples(1024); instanceConfig.setPort(FunctionCommon.findAvailablePort()); - instanceConfig.setMetricsPort(FunctionCommon.findAvailablePort()); + + if (metricsPortStart != null) { + int metricsPort = metricsPortStart + i; + if (metricsPortStart < 0 || metricsPortStart > 65535) { + throw new IllegalArgumentException("Metrics port need to be within the range of 0 and 65535"); + } + instanceConfig.setMetricsPort(metricsPort); + } else { + instanceConfig.setMetricsPort(FunctionCommon.findAvailablePort()); + } instanceConfig.setClusterName("local"); if (functionConfig != null) { instanceConfig.setMaxPendingAsyncRequests(functionConfig.getMaxPendingAsyncRequests()); @@ -535,6 +545,13 @@ private void startThreadedMode(org.apache.pulsar.functions.proto.Function.Functi int parallelism, int instanceIdOffset, String serviceUrl, String stateStorageServiceUrl, AuthenticationConfig authConfig, String userCodeFile) throws Exception { + + if (metricsPortStart != null) { + if (metricsPortStart < 0 || metricsPortStart > 65535) { + throw new IllegalArgumentException("Metrics port need to be within the range of 0 and 65535"); + } + } + SecretsProvider secretsProvider; if (secretsProviderClassName != null) { secretsProvider = (SecretsProvider) Reflections.createInstance(secretsProviderClassName, ClassLoader.getSystemClassLoader()); @@ -552,7 +569,7 @@ private void startThreadedMode(org.apache.pulsar.functions.proto.Function.Functi } // Collector Registry for prometheus metrics - CollectorRegistry collectorRegistry = new CollectorRegistry(); + FunctionCollectorRegistry collectorRegistry = FunctionCollectorRegistry.getDefaultImplementation(); RuntimeUtils.registerDefaultCollectors(collectorRegistry); ThreadRuntimeFactory threadRuntimeFactory; @@ -581,10 +598,7 @@ private void startThreadedMode(org.apache.pulsar.functions.proto.Function.Functi instanceConfig.setInstanceId(i + instanceIdOffset); instanceConfig.setMaxBufferedTuples(1024); if (metricsPortStart != null) { - if (metricsPortStart < 0 || metricsPortStart > 65535) { - throw new IllegalArgumentException("Metrics port need to be within the range of 0 and 65535"); - } - instanceConfig.setMetricsPort(metricsPortStart + i); + instanceConfig.setMetricsPort(metricsPortStart); } instanceConfig.setClusterName("local"); if (functionConfig != null) { @@ -602,13 +616,12 @@ private void startThreadedMode(org.apache.pulsar.functions.proto.Function.Functi 30000); spawners.add(runtimeSpawner); runtimeSpawner.start(); + } - if (metricsPortStart != null) { - // starting metrics server - log.info("Starting metrics server on port {}", instanceConfig.getMetricsPort()); - new HTTPServer(new InetSocketAddress(instanceConfig.getMetricsPort()), collectorRegistry, true); - } - + if (metricsPortStart != null) { + // starting metrics server + log.info("Starting metrics server on port {}", metricsPortStart); + new HTTPServer(new InetSocketAddress(metricsPortStart), collectorRegistry, true); } } diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java index 0de9782655a1ac..170e6060dd41dc 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java @@ -37,6 +37,7 @@ import org.apache.pulsar.common.functions.AuthenticationConfig; import org.apache.pulsar.functions.instance.InstanceCache; import org.apache.pulsar.functions.instance.InstanceConfig; +import org.apache.pulsar.functions.instance.stats.FunctionCollectorRegistry; import org.apache.pulsar.functions.proto.Function; import org.apache.pulsar.functions.proto.InstanceCommunication; import org.apache.pulsar.functions.proto.InstanceControlGrpc; @@ -191,7 +192,7 @@ public void start(String[] args, ClassLoader functionInstanceClassLoader, ClassL secretsProvider.init(secretsProviderConfigMap); // Collector Registry for prometheus metrics - CollectorRegistry collectorRegistry = new CollectorRegistry(); + FunctionCollectorRegistry collectorRegistry = FunctionCollectorRegistry.getDefaultImplementation(); RuntimeUtils.registerDefaultCollectors(collectorRegistry); containerFactory = new ThreadRuntimeFactory("LocalRunnerThreadGroup", pulsarServiceUrl, diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java index b6736d44d943ff..f912a1402bbd41 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java @@ -47,6 +47,7 @@ import org.apache.pulsar.common.functions.AuthenticationConfig; import org.apache.pulsar.functions.instance.InstanceConfig; import org.apache.pulsar.functions.instance.go.GoInstanceConfig; +import org.apache.pulsar.functions.instance.stats.FunctionCollectorRegistry; import org.apache.pulsar.functions.proto.Function; import org.apache.pulsar.functions.utils.FunctionCommon; @@ -465,7 +466,7 @@ public static T getRuntimeFunctionConfig(Map configMap, Clas return ObjectMapperFactory.getThreadLocal().convertValue(configMap, functionRuntimeConfigClass); } - public static void registerDefaultCollectors(CollectorRegistry registry) { + public static void registerDefaultCollectors(FunctionCollectorRegistry registry) { // Add the JMX exporter for functionality similar to the kafka connect JMX metrics try { new JmxCollector("{}").register(registry); diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java index 30aef25c9d313a..7f4bdbd8612dc7 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java @@ -34,6 +34,7 @@ import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.functions.instance.InstanceConfig; import org.apache.pulsar.functions.instance.InstanceUtils; +import org.apache.pulsar.functions.instance.stats.FunctionCollectorRegistry; import org.apache.pulsar.functions.proto.Function; import org.apache.pulsar.functions.proto.InstanceCommunication; import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus; @@ -66,7 +67,7 @@ public class ThreadRuntime implements Runtime { private PulsarAdmin pulsarAdmin; private String stateStorageServiceUrl; private SecretsProvider secretsProvider; - private CollectorRegistry collectorRegistry; + private FunctionCollectorRegistry collectorRegistry; private String narExtractionDirectory; private final Optional connectorsManager; @@ -78,7 +79,7 @@ public class ThreadRuntime implements Runtime { PulsarAdmin pulsarAdmin, String stateStorageServiceUrl, SecretsProvider secretsProvider, - CollectorRegistry collectorRegistry, + FunctionCollectorRegistry collectorRegistry, String narExtractionDirectory, Optional connectorsManager) { this.instanceConfig = instanceConfig; diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java index 1be328bda6f706..d1f450c9581410 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java @@ -33,6 +33,7 @@ import org.apache.pulsar.functions.instance.InstanceCache; import org.apache.pulsar.functions.instance.InstanceConfig; import org.apache.pulsar.functions.instance.InstanceUtils; +import org.apache.pulsar.functions.instance.stats.FunctionCollectorRegistry; import org.apache.pulsar.functions.runtime.RuntimeCustomizer; import org.apache.pulsar.functions.runtime.RuntimeFactory; import org.apache.pulsar.functions.runtime.RuntimeUtils; @@ -60,7 +61,7 @@ public class ThreadRuntimeFactory implements RuntimeFactory { private PulsarAdmin pulsarAdmin; private String storageServiceUrl; private SecretsProvider defaultSecretsProvider; - private CollectorRegistry collectorRegistry; + private FunctionCollectorRegistry collectorRegistry; private String narExtractionDirectory; private volatile boolean closed; private SecretsProviderConfigurator secretsProviderConfigurator; @@ -73,7 +74,7 @@ public class ThreadRuntimeFactory implements RuntimeFactory { */ public ThreadRuntimeFactory(String threadGroupName, String pulsarServiceUrl, String storageServiceUrl, AuthenticationConfig authConfig, SecretsProvider secretsProvider, - CollectorRegistry collectorRegistry, String narExtractionDirectory, + FunctionCollectorRegistry collectorRegistry, String narExtractionDirectory, ClassLoader rootClassLoader, boolean exposePulsarAdminClientEnabled, String pulsarWebServiceUrl) throws Exception { initialize(threadGroupName, Optional.empty(), pulsarServiceUrl, authConfig, @@ -83,7 +84,7 @@ public ThreadRuntimeFactory(String threadGroupName, String pulsarServiceUrl, Str private void initialize(String threadGroupName, Optional memoryLimit, String pulsarServiceUrl, AuthenticationConfig authConfig, String storageServiceUrl, SecretsProviderConfigurator secretsProviderConfigurator, SecretsProvider secretsProvider, - CollectorRegistry collectorRegistry, String narExtractionDirectory, + FunctionCollectorRegistry collectorRegistry, String narExtractionDirectory, ClassLoader rootClassLoader, boolean exposePulsarAdminClientEnabled, String pulsarWebServiceUrl, Optional connectorsManager) throws PulsarClientException {