Skip to content

Commit

Permalink
[improve][fn] Make producer cache bounded and expiring in Functions/C…
Browse files Browse the repository at this point in the history
…onnectors (apache#22945)
  • Loading branch information
lhotari authored Jun 25, 2024
1 parent 69b2739 commit 6fe8100
Show file tree
Hide file tree
Showing 7 changed files with 267 additions and 205 deletions.
5 changes: 5 additions & 0 deletions pulsar-functions/instance/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,11 @@
<artifactId>guava</artifactId>
</dependency>

<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
</dependency>

<dependency>
<groupId>info.picocli</groupId>
<artifactId>picocli</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,15 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.ClientBuilder;
Expand Down Expand Up @@ -85,6 +84,7 @@
/**
* This class implements the Context interface exposed to the user.
*/
@Slf4j
@ToString(exclude = {"pulsarAdmin"})
class ContextImpl implements Context, SinkContext, SourceContext, AutoCloseable {
private final ProducerBuilderFactory producerBuilderFactory;
Expand All @@ -98,8 +98,6 @@ class ContextImpl implements Context, SinkContext, SourceContext, AutoCloseable
private final ClientBuilder clientBuilder;
private final PulsarClient client;
private final PulsarAdmin pulsarAdmin;
private Map<String, Producer<?>> publishProducers;
private ThreadLocal<Map<String, Producer<?>>> tlPublishProducers;

private final TopicSchema topicSchema;

Expand Down Expand Up @@ -139,12 +137,15 @@ class ContextImpl implements Context, SinkContext, SourceContext, AutoCloseable

private final java.util.function.Consumer<Throwable> fatalHandler;

private final ProducerCache producerCache;
private final boolean useThreadLocalProducers;

public ContextImpl(InstanceConfig config, Logger logger, PulsarClient client,
SecretsProvider secretsProvider, FunctionCollectorRegistry collectorRegistry,
String[] metricsLabels,
Function.FunctionDetails.ComponentType componentType, ComponentStatsManager statsManager,
StateManager stateManager, PulsarAdmin pulsarAdmin, ClientBuilder clientBuilder,
java.util.function.Consumer<Throwable> fatalHandler) {
java.util.function.Consumer<Throwable> fatalHandler, ProducerCache producerCache) {
this.config = config;
this.logger = logger;
this.clientBuilder = clientBuilder;
Expand All @@ -154,14 +155,17 @@ public ContextImpl(InstanceConfig config, Logger logger, PulsarClient client,
this.statsManager = statsManager;
this.fatalHandler = fatalHandler;

boolean useThreadLocalProducers = false;
this.producerCache = producerCache;

Function.ProducerSpec producerSpec = config.getFunctionDetails().getSink().getProducerSpec();
ProducerConfig producerConfig = null;
if (producerSpec != null) {
producerConfig = FunctionConfigUtils.convertProducerSpecToProducerConfig(producerSpec);
useThreadLocalProducers = producerSpec.getUseThreadLocalProducers();
} else {
useThreadLocalProducers = false;
}

producerBuilderFactory = new ProducerBuilderFactory(client, producerConfig,
Thread.currentThread().getContextClassLoader(),
// This is for backwards compatibility. The PR https://github.com/apache/pulsar/pull/19470 removed
Expand All @@ -175,12 +179,6 @@ public ContextImpl(InstanceConfig config, Logger logger, PulsarClient client,
this.config.getFunctionDetails().getName()),
this.config.getInstanceId()));

if (useThreadLocalProducers) {
tlPublishProducers = new ThreadLocal<>();
} else {
publishProducers = new ConcurrentHashMap<>();
}

if (config.getFunctionDetails().getUserConfig().isEmpty()) {
userConfigs = new HashMap<>();
} else {
Expand Down Expand Up @@ -543,39 +541,15 @@ public void fatal(Throwable t) {
}

private <T> Producer<T> getProducer(String topicName, Schema<T> schema) throws PulsarClientException {
Producer<T> producer;
if (tlPublishProducers != null) {
Map<String, Producer<?>> producerMap = tlPublishProducers.get();
if (producerMap == null) {
producerMap = new HashMap<>();
tlPublishProducers.set(producerMap);
}
producer = (Producer<T>) producerMap.get(topicName);
} else {
producer = (Producer<T>) publishProducers.get(topicName);
}

if (producer == null) {
Producer<T> newProducer = producerBuilderFactory
.createProducerBuilder(topicName, schema, null)
.properties(producerProperties)
.create();

if (tlPublishProducers != null) {
tlPublishProducers.get().put(topicName, newProducer);
} else {
Producer<T> existingProducer = (Producer<T>) publishProducers.putIfAbsent(topicName, newProducer);

if (existingProducer != null) {
// The value in the map was not updated after the concurrent put
newProducer.close();
producer = existingProducer;
} else {
producer = newProducer;
}
}
}
return producer;
Long additionalCacheKey = useThreadLocalProducers ? Thread.currentThread().getId() : null;
return producerCache.getOrCreateProducer(ProducerCache.CacheArea.CONTEXT_CACHE,
topicName, additionalCacheKey, () -> {
log.info("Initializing producer on topic {} with schema {}", topicName, schema);
return producerBuilderFactory
.createProducerBuilder(topicName, schema, null)
.properties(producerProperties)
.create();
});
}

public Map<String, Double> getAndResetMetrics() {
Expand Down Expand Up @@ -714,29 +688,9 @@ public void setUnderlyingBuilder(TypedMessageBuilder<T> underlyingBuilder) {

@Override
public void close() {
List<CompletableFuture> futures = new LinkedList<>();

if (publishProducers != null) {
for (Producer<?> producer : publishProducers.values()) {
futures.add(producer.closeAsync());
}
}

if (tlPublishProducers != null) {
for (Producer<?> producer : tlPublishProducers.get().values()) {
futures.add(producer.closeAsync());
}
}

if (pulsarAdmin != null) {
pulsarAdmin.close();
}

try {
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get();
} catch (InterruptedException | ExecutionException e) {
logger.warn("Failed to close producers", e);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,8 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
private final AtomicReference<Schema<?>> sinkSchema = new AtomicReference<>();
private SinkSchemaInfoProvider sinkSchemaInfoProvider = null;

private final ProducerCache producerCache = new ProducerCache();

public JavaInstanceRunnable(InstanceConfig instanceConfig,
ClientBuilder clientBuilder,
PulsarClient pulsarClient,
Expand Down Expand Up @@ -292,7 +294,7 @@ ContextImpl setupContext() throws PulsarClientException {
Thread.currentThread().setContextClassLoader(functionClassLoader);
return new ContextImpl(instanceConfig, instanceLog, client, secretsProvider,
collectorRegistry, metricsLabels, this.componentType, this.stats, stateManager,
pulsarAdmin, clientBuilder, fatalHandler);
pulsarAdmin, clientBuilder, fatalHandler, producerCache);
} finally {
Thread.currentThread().setContextClassLoader(clsLoader);
}
Expand Down Expand Up @@ -607,6 +609,8 @@ public synchronized void close() {

instanceCache = null;

producerCache.close();

if (logAppender != null) {
removeLogTopicAppender(LoggerContext.getContext());
removeLogTopicAppender(LoggerContext.getContext(false));
Expand Down Expand Up @@ -1050,7 +1054,7 @@ private void setupOutput(ContextImpl contextImpl) throws Exception {
}

object = new PulsarSink(this.client, pulsarSinkConfig, this.properties, this.stats,
this.functionClassLoader);
this.functionClassLoader, this.producerCache);
}
} else {
object = Reflections.createInstance(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.functions.instance;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.Scheduler;
import com.google.common.annotations.VisibleForTesting;
import java.io.Closeable;
import java.time.Duration;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.common.util.FutureUtil;

@Slf4j
public class ProducerCache implements Closeable {
// allow tuning the cache timeout with PRODUCER_CACHE_TIMEOUT_SECONDS env variable
private static final int PRODUCER_CACHE_TIMEOUT_SECONDS =
Integer.parseInt(System.getenv().getOrDefault("PRODUCER_CACHE_TIMEOUT_SECONDS", "300"));
// allow tuning the cache size with PRODUCER_CACHE_MAX_SIZE env variable
private static final int PRODUCER_CACHE_MAX_SIZE =
Integer.parseInt(System.getenv().getOrDefault("PRODUCER_CACHE_MAX_SIZE", "10000"));
private static final int FLUSH_OR_CLOSE_TIMEOUT_SECONDS = 60;

// prevents the different producers created in different code locations from mixing up
public enum CacheArea {
// producers created by calling Context, SinkContext, SourceContext methods
CONTEXT_CACHE,
// producers created in Pulsar Sources, multiple topics are possible by returning destination topics
// by SinkRecord.getDestinationTopic call
SINK_RECORD_CACHE,
}

record ProducerCacheKey(CacheArea cacheArea, String topic, Object additionalKey) {
}

private final Cache<ProducerCacheKey, Producer<?>> cache;
private final AtomicBoolean closed = new AtomicBoolean(false);
private final CopyOnWriteArrayList<CompletableFuture<Void>> closeFutures = new CopyOnWriteArrayList<>();

public ProducerCache() {
Caffeine<ProducerCacheKey, Producer> builder = Caffeine.newBuilder()
.scheduler(Scheduler.systemScheduler())
.<ProducerCacheKey, Producer>removalListener((key, producer, cause) -> {
log.info("Closing producer for topic {}, cause {}", key.topic(), cause);
CompletableFuture closeFuture =
producer.flushAsync()
.orTimeout(FLUSH_OR_CLOSE_TIMEOUT_SECONDS, TimeUnit.SECONDS)
.exceptionally(ex -> {
log.error("Error flushing producer for topic {}", key.topic(), ex);
return null;
}).thenCompose(__ ->
producer.closeAsync().orTimeout(FLUSH_OR_CLOSE_TIMEOUT_SECONDS,
TimeUnit.SECONDS)
.exceptionally(ex -> {
log.error("Error closing producer for topic {}", key.topic(),
ex);
return null;
}));
if (closed.get()) {
closeFutures.add(closeFuture);
}
})
.weigher((key, producer) -> Math.max(producer.getNumOfPartitions(), 1))
.maximumWeight(PRODUCER_CACHE_MAX_SIZE);
if (PRODUCER_CACHE_TIMEOUT_SECONDS > 0) {
builder.expireAfterAccess(Duration.ofSeconds(PRODUCER_CACHE_TIMEOUT_SECONDS));
}
cache = builder.build();
}

public <T> Producer<T> getOrCreateProducer(CacheArea cacheArea, String topicName, Object additionalCacheKey,
Callable<Producer<T>> supplier) {
if (closed.get()) {
throw new IllegalStateException("ProducerCache is already closed");
}
return (Producer<T>) cache.get(new ProducerCacheKey(cacheArea, topicName, additionalCacheKey), key -> {
try {
return supplier.call();
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
throw new RuntimeException("Unable to create producer for topic '" + topicName + "'", e);
}
});
}

public void close() {
if (closed.compareAndSet(false, true)) {
cache.invalidateAll();
try {
FutureUtil.waitForAll(closeFutures).get();
} catch (InterruptedException | ExecutionException e) {
log.warn("Failed to close producers", e);
}
}
}

@VisibleForTesting
public boolean containsKey(CacheArea cacheArea, String topic) {
return containsKey(cacheArea, topic, null);
}

@VisibleForTesting
public boolean containsKey(CacheArea cacheArea, String topic, Object additionalCacheKey) {
return cache.getIfPresent(new ProducerCacheKey(cacheArea, topic, additionalCacheKey)) != null;
}
}
Loading

0 comments on commit 6fe8100

Please sign in to comment.